api接口

最近更新时间: 2026-03-13 09:03:00

api接口主要有:AdminClient (kafka 2.8.1 API)KafkaProducer (kafka 2.8.1 API)KafkaConsumer (kafka 2.8.1 API)

java api

生产者java api使用示例

//生产
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class KafkaJavaApiClientExample {

    public static void main(String[] args) {
        System.setProperty("java.security.auth.login.config", "/usr/local/service/kafka/config/kafka-gssapi-jaas.conf");
        System.setProperty("java.security.krb5.conf", "/etc/krb5.conf");
        Properties props = new Properties();
        props.put("bootstrap.servers", "x.x.x.x:9092");//kafka集群,broker-list
        props.put("acks", "all");
        props.put("retries", 1);//重试次数
        props.put("batch.size", 16384);//批次大小
        props.put("linger.ms", 1);//等待时间
        props.put("buffer.memory", 33554432);//RecordAccumulator缓冲区大小
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // kerberos认证
        props.put("security.protocol", "SASL_PLAINTEXT");
        props.put("sasl.mechanism", "GSSAPI");
        props.put("sasl.kerberos.service.name", "hadoop");
        props.put("sasl.jaas.config", "com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab=\"/var/krb5kdc/emr.keytab\" principal=\"xxxx/x.x.x.x@TBDS-XXXX\";");

        // 创建KafkaProducer客户端
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 10 ; i++) {
            producer.send(new ProducerRecord<>("test_topic_001","ImKey-"+i,"ImValue-"+i));
        }
        // 关闭资源
        producer.close();
    }
}

//编译
/usr/local/jdk/bin/javac -classpath kafka-clients-2.8.2.jar KafkaJavaApiClientExample.java
//打包
/usr/local/jdk/bin/jar -cvf KafkaJavaApiClientExample.jar KafkaJavaApiClientExample.class

//建议使用idea编译打包fat jar
//参考示例工程 kafka-examples/kafka-java-api-example 打包出 kafka-java-api-example-1.0-SNAPSHOT-jar-with-dependencies.jar
mvn clean package

//运行
/usr/local/jdk/bin/java -classpath kafka-java-api-example-1.0-SNAPSHOT-jar-with-dependencies.jar com.tencent.tbds.kafka.KafkaJavaApiClientExample

消费者java api使用示例


//消费
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

public class KafkaJavaApiConsumerClientExample {

    public static void main(String[] args) throws InterruptedException {
        System.setProperty("java.security.auth.login.config", "/usr/local/service/kafka/config/kafka-gssapi-jaas.conf");
        System.setProperty("java.security.krb5.conf", "/etc/krb5.conf");
        Properties props = new Properties();
        props.put("bootstrap.servers", "x.x.x.x:9092");//kafka集群,broker-list
        props.put("group.id", "test-consumer-group111");//消费者组,只要group.id相同,就属于同一个消费者组
        props.put("enable.auto.commit", "true");//自动提交offset
        props.put("auto.commit.interval.ms", "1000"); // 自动提交时间间隔
        props.put("max.poll.records", "1000"); // 拉取的数据条数
        props.put("session.timeout.ms", "10000"); // 维持session的时间。超过这个时间没有心跳 就会剔出消费者组
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("auto.offset.reset", "earliest");
        // kerberos认证
        props.put("security.protocol", "SASL_PLAINTEXT");
        props.put("sasl.mechanism", "GSSAPI");
        props.put("sasl.kerberos.service.name", "hadoop");
        props.put("sasl.jaas.config", "com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab=\"/var/krb5kdc/emr.keytab\" principal=\"xxxx/x.x.x.x@TBDS-XXXX\";");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        // 可以写多个topic
        consumer.subscribe(Arrays.asList("test_topic_001"));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(5000);
            if (records.isEmpty()) {
                System.out.println("没有需要消费的数据了!");
                break;
            }
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
            Thread.sleep(5000L);
            System.out.println("处理了一批数据!");
        }
    }
}

//编译
/usr/local/jdk/bin/javac -classpath kafka-clients-2.8.2.jar KafkaJavaApiConsumerClientExample.java
//打包
/usr/local/jdk/bin/jar -cvf KafkaJavaApiConsumerClientExample.jar KafkaJavaApiConsumerClientExample.class

//建议使用idea编译打包fat jar
//参考示例工程 kafka-examples/kafka-java-api-example 打包出 kafka-java-api-example-1.0-SNAPSHOT-jar-with-dependencies.jar
mvn clean package

//运行
/usr/local/jdk/bin/java -classpath kafka-java-api-example-1.0-SNAPSHOT-jar-with-dependencies.jar com.tencent.tbds.kafka.KafkaJavaApiConsumerClientExample

Scala api

生产者Scala api使用示例


//生产
import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}

object KafkaScalaApiClientExample {
  def main(args: Array[String]): Unit = {
    System.setProperty("java.security.auth.login.config", "/usr/local/service/kafka/config/kafka-gssapi-jaas.conf");
    System.setProperty("java.security.krb5.conf", "/etc/krb5.conf");
    val props:Properties = new Properties()
    props.put("bootstrap.servers", "x.x.x.x:9092");//kafka集群,broker-list
    props.put("acks","all")
    props.put("acks", "all");
    props.put("retries", "1");//重试次数
    props.put("batch.size", "16384");//批次大小
    props.put("linger.ms", "1");//等待时间
    props.put("buffer.memory", "33554432");//RecordAccumulator缓冲区大小
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    // kerberos认证
    props.put("security.protocol", "SASL_PLAINTEXT");
    props.put("sasl.mechanism", "GSSAPI");
    props.put("sasl.kerberos.service.name", "hadoop");
    props.put("sasl.jaas.config", "com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab=\"/var/krb5kdc/emr.keytab\" principal=\"xxxx/x.x.x.x@TBDS-XXXX\";");

    val producer = new KafkaProducer[String, String](props)
    val topic = "test_topic_001"

    try {
      for (i <- 0 to 10) {
        val record = new ProducerRecord[String, String](topic, "00"+i, "11"+i)
        val metadata = producer.send(record)
        printf(s"sent record(key=%s value=%s) " +
          "meta(partition=%d, offset=%d)\n",
          record.key(), record.value(),
          metadata.get().partition(),
          metadata.get().offset())
      }
    }catch{
      case e:Exception => e.printStackTrace()
    }finally {
      producer.close()
    }
  }
}

//编译
scalac -classpath kafka-clients-2.8.2.jar:slf4j-api-2.0.9.jar KafkaScalaApiClientExample.scala

//建议使用idea编译打包fat jar
//参考示例工程 kafka-examples/kafka-scala-api-example 打包出 kafka-scala-api-example-1.0-SNAPSHOT-jar-with-dependencies.jar
mvn clean package

//运行
scala -cp .:kafka-scala-api-example-1.0-SNAPSHOT-jar-with-dependencies.jar com.tencent.tbds.kafka.KafkaScalaApiClientExample

消费者java api使用示例


//消费
import java.util.Properties
import scala.collection.JavaConverters._
import org.apache.kafka.clients.consumer.KafkaConsumer

object KafkaScalaApiConsumerClientExample {

  def main(args: Array[String]): Unit = {
    System.setProperty("java.security.auth.login.config", "/usr/local/service/kafka/config/kafka-gssapi-jaas.conf");
    System.setProperty("java.security.krb5.conf", "/etc/krb5.conf");
    val props:Properties = new Properties()
    props.put("bootstrap.servers", "x.x.x.x:9092");//kafka集群,broker-list
    props.put("group.id", "test-consumer-group111");//消费者组,只要group.id相同,就属于同一个消费者组
    props.put("enable.auto.commit", "true");//自动提交offset
    props.put("auto.commit.interval.ms", "1000"); // 自动提交时间间隔
    props.put("max.poll.records", "1000"); // 拉取的数据条数
    props.put("session.timeout.ms", "10000"); // 维持session的时间。超过这个时间没有心跳 就会剔出消费者组
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("auto.offset.reset", "earliest");
    // kerberos认证
    props.put("security.protocol", "SASL_PLAINTEXT");
    props.put("sasl.mechanism", "GSSAPI");
    props.put("sasl.kerberos.service.name", "hadoop");
    props.put("sasl.jaas.config", "com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab=\"/var/krb5kdc/emr.keytab\" principal=\"xxxx/x.x.x.x@TBDS-XXXX\";");

    val consumer = new KafkaConsumer(props)
    val topics = List("test_topic_001")

    try {
      consumer.subscribe(topics.asJava)   // 订阅主题
      while (true) {
        val records = consumer.poll(10)   // 轮询
        for (record <- records.asScala) {
          println("Topic: " + record.topic() +
            ",Key: " + record.key() +
            ",Value: " + record.value() +
            ", Offset: " + record.offset() +
            ", Partition: " + record.partition())
        }
      }
    }catch{
      case e:Exception => e.printStackTrace()
    }finally {
      consumer.close()
    }
  }
}

//编译
scalac -classpath kafka-clients-2.8.2.jar:slf4j-api-2.0.9.jar KafkaScalaApiConsumerClientExample.scala

//建议使用idea编译打包fat jar
//参考示例工程 kafka-examples/kafka-scala-api-example 打包出 kafka-scala-api-example-1.0-SNAPSHOT-jar-with-dependencies.jar
mvn clean package

//运行
scala -cp .:kafka-scala-api-example-1.0-SNAPSHOT-jar-with-dependencies.jar com.tencent.tbds.kafka.KafkaScalaApiConsumerClientExample