示例工程开发

最近更新时间: 2026-03-13 09:03:00

环境准备

参考 spark开发->示例工程开发配置 环境

禁止:

严格禁止使用低版本kafka操作高版本的kafka的topic,如修改分区数和增加副本等操作。由此不合规操作导致的集群组件异常等后果,不属于TBDS责任范围(参考产品SLA),需自行承担相关责任!

代码逻辑说明

  1. 功能说明。
    本示例演示创建和删除topic等基本操作

  2. POM中添加依赖。

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.8.2</version>
</dependency>
  1. 核心代码示例。

import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartitionInfo;

import java.util.*;
import java.util.concurrent.ExecutionException;

public class KafkaClientExample {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.setProperty("java.security.auth.login.config", "/usr/local/service/kafka/config/kafka-gssapi-jaas.conf");
        System.setProperty("java.security.krb5.conf", "/etc/krb5.conf");
        Properties props = new Properties();
        //kafka集群broker list
        props.put("bootstrap.servers", "x.x.x.x:9092");
        props.put("acks", "all");
        // kerberos认证
        props.put("security.protocol", "SASL_PLAINTEXT");
        props.put("sasl.mechanism", "GSSAPI");
        props.put("sasl.kerberos.service.name", "hadoop");
        props.put("sasl.jaas.config", "com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab=\"/var/krb5kdc/emr.keytab\" principal=\"xxxx/x.x.x.x@TBDS-XXXX\";");
        // 实例化adminClient
        AdminClient adminClient = AdminClient.create(props);
        // 创建topic
        NewTopic newTopic = new NewTopic("test_topic_001",4, (short) 1);
        Collection<NewTopic> newTopicList = new ArrayList<>();
        newTopicList.add(newTopic);
        adminClient.createTopics(newTopicList);
        // topic list
        System.out.println();
        ListTopicsResult result = adminClient.listTopics();
        Collection<TopicListing> topics = result.listings().get();
        topics.forEach(each -> System.out.println(each.name()));
        // describe topic
        Collection<String> topicNames = new ArrayList<>();
        topicNames.add("test_topic_001");
        DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(topicNames);
        Map<String, TopicDescription> map = describeTopicsResult.all().get();
        for (Map.Entry<String, TopicDescription> entry : map.entrySet()) {
            System.out.println("topicName:" + entry.getValue().name()); //当前topic的名字
            System.out.println("partition num:" + entry.getValue().partitions().size()); //当前topic的partition数量
            System.out.println("listp:");
            List<TopicPartitionInfo> listp = entry.getValue().partitions(); //拿到topic的partitions相关信息
            for (TopicPartitionInfo info : listp) {
                System.out.println("----------------------------------");
                System.out.println("info.partition():" + info.partition());
                System.out.println("info.leader().id():" + info.leader().id()); //领导者所在机器id,也就是机器编号配置文件中的service id
                System.out.println("info.leader().host():" + info.leader().host()); //领导者所在机器host ip
                System.out.println("info.leader().port():" + info.leader().port()); //领导者所在机器port
                System.out.println("info.replicas():" + info.replicas());  //副本的信息,有多少会拿到多少
                List<Node> listInfo = info.replicas();
                //输出node信息
                for (Node n : listInfo) {
                    System.out.println("info.id():" + n.id()); //副本所在的node id
                    System.out.println("info.host():" + n.host()); //副本所在node的host ip
                    System.out.println("info.port():" + n.port()); //副本所在node的port
                }
            }
        }
        // delete topic
        adminClient.deleteTopics(topicNames);
        // topic list
        System.out.println();
        result = adminClient.listTopics();
        topics = result.listings().get();
        topics.forEach(each -> System.out.println(each.name()));
        // close
        adminClient.close();
    }
}