环境准备
参考 spark开发->示例工程开发配置 环境
禁止:
严格禁止使用低版本kafka操作高版本的kafka的topic,如修改分区数和增加副本等操作。由此不合规操作导致的集群组件异常等后果,不属于TBDS责任范围(参考产品SLA),需自行承担相关责任!
代码逻辑说明
功能说明。
本示例演示创建和删除topic等基本操作POM中添加依赖。
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.2</version>
</dependency>
- 核心代码示例。
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartitionInfo;
import java.util.*;
import java.util.concurrent.ExecutionException;
public class KafkaClientExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.setProperty("java.security.auth.login.config", "/usr/local/service/kafka/config/kafka-gssapi-jaas.conf");
System.setProperty("java.security.krb5.conf", "/etc/krb5.conf");
Properties props = new Properties();
//kafka集群broker list
props.put("bootstrap.servers", "x.x.x.x:9092");
props.put("acks", "all");
// kerberos认证
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "GSSAPI");
props.put("sasl.kerberos.service.name", "hadoop");
props.put("sasl.jaas.config", "com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab=\"/var/krb5kdc/emr.keytab\" principal=\"xxxx/x.x.x.x@TBDS-XXXX\";");
// 实例化adminClient
AdminClient adminClient = AdminClient.create(props);
// 创建topic
NewTopic newTopic = new NewTopic("test_topic_001",4, (short) 1);
Collection<NewTopic> newTopicList = new ArrayList<>();
newTopicList.add(newTopic);
adminClient.createTopics(newTopicList);
// topic list
System.out.println();
ListTopicsResult result = adminClient.listTopics();
Collection<TopicListing> topics = result.listings().get();
topics.forEach(each -> System.out.println(each.name()));
// describe topic
Collection<String> topicNames = new ArrayList<>();
topicNames.add("test_topic_001");
DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(topicNames);
Map<String, TopicDescription> map = describeTopicsResult.all().get();
for (Map.Entry<String, TopicDescription> entry : map.entrySet()) {
System.out.println("topicName:" + entry.getValue().name()); //当前topic的名字
System.out.println("partition num:" + entry.getValue().partitions().size()); //当前topic的partition数量
System.out.println("listp:");
List<TopicPartitionInfo> listp = entry.getValue().partitions(); //拿到topic的partitions相关信息
for (TopicPartitionInfo info : listp) {
System.out.println("----------------------------------");
System.out.println("info.partition():" + info.partition());
System.out.println("info.leader().id():" + info.leader().id()); //领导者所在机器id,也就是机器编号配置文件中的service id
System.out.println("info.leader().host():" + info.leader().host()); //领导者所在机器host ip
System.out.println("info.leader().port():" + info.leader().port()); //领导者所在机器port
System.out.println("info.replicas():" + info.replicas()); //副本的信息,有多少会拿到多少
List<Node> listInfo = info.replicas();
//输出node信息
for (Node n : listInfo) {
System.out.println("info.id():" + n.id()); //副本所在的node id
System.out.println("info.host():" + n.host()); //副本所在node的host ip
System.out.println("info.port():" + n.port()); //副本所在node的port
}
}
}
// delete topic
adminClient.deleteTopics(topicNames);
// topic list
System.out.println();
result = adminClient.listTopics();
topics = result.listings().get();
topics.forEach(each -> System.out.println(each.name()));
// close
adminClient.close();
}
}