api接口主要有:AdminClient (kafka 2.8.1 API)、KafkaProducer (kafka 2.8.1 API)、KafkaConsumer (kafka 2.8.1 API)
java api
生产者java api使用示例
//生产
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class KafkaJavaApiClientExample {
public static void main(String[] args) {
System.setProperty("java.security.auth.login.config", "/usr/local/service/kafka/config/kafka-gssapi-jaas.conf");
System.setProperty("java.security.krb5.conf", "/etc/krb5.conf");
Properties props = new Properties();
props.put("bootstrap.servers", "x.x.x.x:9092");//kafka集群,broker-list
props.put("acks", "all");
props.put("retries", 1);//重试次数
props.put("batch.size", 16384);//批次大小
props.put("linger.ms", 1);//等待时间
props.put("buffer.memory", 33554432);//RecordAccumulator缓冲区大小
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// kerberos认证
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "GSSAPI");
props.put("sasl.kerberos.service.name", "hadoop");
props.put("sasl.jaas.config", "com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab=\"/var/krb5kdc/emr.keytab\" principal=\"xxxx/x.x.x.x@TBDS-XXXX\";");
// 创建KafkaProducer客户端
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 10 ; i++) {
producer.send(new ProducerRecord<>("test_topic_001","ImKey-"+i,"ImValue-"+i));
}
// 关闭资源
producer.close();
}
}
//编译
/usr/local/jdk/bin/javac -classpath kafka-clients-2.8.2.jar KafkaJavaApiClientExample.java
//打包
/usr/local/jdk/bin/jar -cvf KafkaJavaApiClientExample.jar KafkaJavaApiClientExample.class
//建议使用idea编译打包fat jar
//参考示例工程 kafka-examples/kafka-java-api-example 打包出 kafka-java-api-example-1.0-SNAPSHOT-jar-with-dependencies.jar
mvn clean package
//运行
/usr/local/jdk/bin/java -classpath kafka-java-api-example-1.0-SNAPSHOT-jar-with-dependencies.jar com.tencent.tbds.kafka.KafkaJavaApiClientExample
消费者java api使用示例
//消费
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class KafkaJavaApiConsumerClientExample {
public static void main(String[] args) throws InterruptedException {
System.setProperty("java.security.auth.login.config", "/usr/local/service/kafka/config/kafka-gssapi-jaas.conf");
System.setProperty("java.security.krb5.conf", "/etc/krb5.conf");
Properties props = new Properties();
props.put("bootstrap.servers", "x.x.x.x:9092");//kafka集群,broker-list
props.put("group.id", "test-consumer-group111");//消费者组,只要group.id相同,就属于同一个消费者组
props.put("enable.auto.commit", "true");//自动提交offset
props.put("auto.commit.interval.ms", "1000"); // 自动提交时间间隔
props.put("max.poll.records", "1000"); // 拉取的数据条数
props.put("session.timeout.ms", "10000"); // 维持session的时间。超过这个时间没有心跳 就会剔出消费者组
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "earliest");
// kerberos认证
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "GSSAPI");
props.put("sasl.kerberos.service.name", "hadoop");
props.put("sasl.jaas.config", "com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab=\"/var/krb5kdc/emr.keytab\" principal=\"xxxx/x.x.x.x@TBDS-XXXX\";");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 可以写多个topic
consumer.subscribe(Arrays.asList("test_topic_001"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(5000);
if (records.isEmpty()) {
System.out.println("没有需要消费的数据了!");
break;
}
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
Thread.sleep(5000L);
System.out.println("处理了一批数据!");
}
}
}
//编译
/usr/local/jdk/bin/javac -classpath kafka-clients-2.8.2.jar KafkaJavaApiConsumerClientExample.java
//打包
/usr/local/jdk/bin/jar -cvf KafkaJavaApiConsumerClientExample.jar KafkaJavaApiConsumerClientExample.class
//建议使用idea编译打包fat jar
//参考示例工程 kafka-examples/kafka-java-api-example 打包出 kafka-java-api-example-1.0-SNAPSHOT-jar-with-dependencies.jar
mvn clean package
//运行
/usr/local/jdk/bin/java -classpath kafka-java-api-example-1.0-SNAPSHOT-jar-with-dependencies.jar com.tencent.tbds.kafka.KafkaJavaApiConsumerClientExample

Scala api
生产者Scala api使用示例
//生产
import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
object KafkaScalaApiClientExample {
def main(args: Array[String]): Unit = {
System.setProperty("java.security.auth.login.config", "/usr/local/service/kafka/config/kafka-gssapi-jaas.conf");
System.setProperty("java.security.krb5.conf", "/etc/krb5.conf");
val props:Properties = new Properties()
props.put("bootstrap.servers", "x.x.x.x:9092");//kafka集群,broker-list
props.put("acks","all")
props.put("acks", "all");
props.put("retries", "1");//重试次数
props.put("batch.size", "16384");//批次大小
props.put("linger.ms", "1");//等待时间
props.put("buffer.memory", "33554432");//RecordAccumulator缓冲区大小
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// kerberos认证
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "GSSAPI");
props.put("sasl.kerberos.service.name", "hadoop");
props.put("sasl.jaas.config", "com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab=\"/var/krb5kdc/emr.keytab\" principal=\"xxxx/x.x.x.x@TBDS-XXXX\";");
val producer = new KafkaProducer[String, String](props)
val topic = "test_topic_001"
try {
for (i <- 0 to 10) {
val record = new ProducerRecord[String, String](topic, "00"+i, "11"+i)
val metadata = producer.send(record)
printf(s"sent record(key=%s value=%s) " +
"meta(partition=%d, offset=%d)\n",
record.key(), record.value(),
metadata.get().partition(),
metadata.get().offset())
}
}catch{
case e:Exception => e.printStackTrace()
}finally {
producer.close()
}
}
}
//编译
scalac -classpath kafka-clients-2.8.2.jar:slf4j-api-2.0.9.jar KafkaScalaApiClientExample.scala
//建议使用idea编译打包fat jar
//参考示例工程 kafka-examples/kafka-scala-api-example 打包出 kafka-scala-api-example-1.0-SNAPSHOT-jar-with-dependencies.jar
mvn clean package
//运行
scala -cp .:kafka-scala-api-example-1.0-SNAPSHOT-jar-with-dependencies.jar com.tencent.tbds.kafka.KafkaScalaApiClientExample
消费者java api使用示例
//消费
import java.util.Properties
import scala.collection.JavaConverters._
import org.apache.kafka.clients.consumer.KafkaConsumer
object KafkaScalaApiConsumerClientExample {
def main(args: Array[String]): Unit = {
System.setProperty("java.security.auth.login.config", "/usr/local/service/kafka/config/kafka-gssapi-jaas.conf");
System.setProperty("java.security.krb5.conf", "/etc/krb5.conf");
val props:Properties = new Properties()
props.put("bootstrap.servers", "x.x.x.x:9092");//kafka集群,broker-list
props.put("group.id", "test-consumer-group111");//消费者组,只要group.id相同,就属于同一个消费者组
props.put("enable.auto.commit", "true");//自动提交offset
props.put("auto.commit.interval.ms", "1000"); // 自动提交时间间隔
props.put("max.poll.records", "1000"); // 拉取的数据条数
props.put("session.timeout.ms", "10000"); // 维持session的时间。超过这个时间没有心跳 就会剔出消费者组
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "earliest");
// kerberos认证
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "GSSAPI");
props.put("sasl.kerberos.service.name", "hadoop");
props.put("sasl.jaas.config", "com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab=\"/var/krb5kdc/emr.keytab\" principal=\"xxxx/x.x.x.x@TBDS-XXXX\";");
val consumer = new KafkaConsumer(props)
val topics = List("test_topic_001")
try {
consumer.subscribe(topics.asJava) // 订阅主题
while (true) {
val records = consumer.poll(10) // 轮询
for (record <- records.asScala) {
println("Topic: " + record.topic() +
",Key: " + record.key() +
",Value: " + record.value() +
", Offset: " + record.offset() +
", Partition: " + record.partition())
}
}
}catch{
case e:Exception => e.printStackTrace()
}finally {
consumer.close()
}
}
}
//编译
scalac -classpath kafka-clients-2.8.2.jar:slf4j-api-2.0.9.jar KafkaScalaApiConsumerClientExample.scala
//建议使用idea编译打包fat jar
//参考示例工程 kafka-examples/kafka-scala-api-example 打包出 kafka-scala-api-example-1.0-SNAPSHOT-jar-with-dependencies.jar
mvn clean package
//运行
scala -cp .:kafka-scala-api-example-1.0-SNAPSHOT-jar-with-dependencies.jar com.tencent.tbds.kafka.KafkaScalaApiConsumerClientExample
