常用命令

最近更新时间: 2026-03-13 09:03:00

在尝试kafka命令之前,有以下两项工作需要在client侧完成。

  • 准备JAAS配置文件:暂定文件名为/tmp/kafka-jaas-config.properties。
  • 准备sasl认证配置文件:暂定文件名为/tmp/kafka-sasl-config.properties。

准备JAAS配置文件

文件名为/tmp/kafka-jaas-config.properties,名称可随意取。不同的认证方式,文件内容不同,下面举例说明了两种常见的认证方式配置:


//SASL PLAIN认证方式下,文件内容如下:
KafkaClient {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="xxxx"
    password="xxxx@Tbds.com";
};

//SASL GSSAPI(Kerberos)认证方式下,文件内容如下:
//注意替换${keytab path}和"${principal name}
KafkaClient {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    storeKey=true
    keyTab="${keytab path}"
    principal="${principal name}";
};

准备sasl认证配置文件

文件名为/tmp/kafka-sasl-config.properties,名称可随意取。不同的认证方式,文件内容不同,下面举例说明了两种常见的认证方式配置:


//SASL PLAIN认证方式下,文件内容如下:
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="xxxx" password="xxxx@Tbds.com";

//SASL GSSAPI认证(即Kerberos认证)方式下,文件内容如下:
//注意替换${keytab path}和"${principal name}
security.protocol=SASL_PLAINTEXT
sasl.mechanism=GSSAPI
sasl.kerberos.service.name=hadoop
sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="${keytab path}" principal="${principal name}";

创建topic


#如果kafka-sasl-config.properties中配置了sasl.jaas.config属性,则无须export KAFKA_OPTS
export KAFKA_OPTS='-Djava.security.auth.login.config=/tmp/kafka-jaas-config.properties';
/usr/local/service/kafka/bin/kafka-topics.sh --bootstrap-server 10.0.1.153:9092 --create --topic test_lucien_topic_002 --partitions 5 --replication-factor 2 --command-config /tmp/kafka-sasl-config.properties

查看topic

#如果kafka-sasl-config.properties中配置了sasl.jaas.config属性,则无须export KAFKA_OPTS
export KAFKA_OPTS='-Djava.security.auth.login.config=/tmp/kafka-jaas-config.properties';
/usr/local/service/kafka/bin/kafka-topics.sh --bootstrap-server 10.0.1.153:9092 --describe --topic test_lucien_topic_001 --command-config /tmp/kafka-sasl-config.properties

更新topic

#如果kafka-sasl-config.properties中配置了sasl.jaas.config属性,则无须export KAFKA_OPTS
export KAFKA_OPTS='-Djava.security.auth.login.config=/tmp/kafka-jaas-config.properties';
#修改分区数
/usr/local/service/kafka/bin/kafka-topics.sh --bootstrap-server  ${brokerIP}:9092 --alter --topic test_topic_001 --partitions 30 --command-config /tmp/kafka-sasl-config.properties

#修改配置
/usr/local/service/kafka/bin/kafka-configs.sh --bootstrap-server ${brokerIP}:9092 --entity-type topics --entity-name test_topic_001 --alter --add-config retention.ms=604800000 --command-config /tmp/kafka-sasl-config.properties

#删除配置
/usr/local/service/kafka/bin/kafka-configs.sh --bootstrap-server ${brokerIP}:9092 --entity-type topics --entity-name test_topic_001 --alter --delete-config retention.ms --command-config /tmp/kafka-sasl-config.properties

删除topic


#如果kafka-sasl-config.properties中配置了sasl.jaas.config属性,则无须export KAFKA_OPTS
export KAFKA_OPTS='-Djava.security.auth.login.config=/tmp/kafka-jaas-config.properties';
/usr/local/service/kafka/bin/kafka-topics.sh --bootstrap-server 10.0.1.153:9092 --delete --topic test_lucien_topic_001 --command-config /tmp/kafka-sasl-config.properties

生产数据


#如果kafka-sasl-config.properties中配置了sasl.jaas.config属性,则无须export KAFKA_OPTS
export KAFKA_OPTS='-Djava.security.auth.login.config=/tmp/kafka-jaas-config.properties';
/usr/local/service/kafka/bin/kafka-console-producer.sh --topic test_lucien_topic_001 --bootstrap-server  10.0.1.153:9092 --producer.config /tmp/kafka-sasl-config.properties

{"id":6,"name":"fengfeng6"}

消费数据


#如果kafka-sasl-config.properties中配置了sasl.jaas.config属性,则无须export KAFKA_OPTS
export KAFKA_OPTS='-Djava.security.auth.login.config=/tmp/kafka-jaas-config.properties';
/usr/local/service/kafka/bin/kafka-console-consumer.sh --topic test_lucien_topic_001 --from-beginning --bootstrap-server 10.0.1.153:9092 --consumer.config /tmp/kafka-sasl-config.properties