日志服务(Cloud Log Service,CLS)目前已支持使用 Kafka Producer SDK 和其他 Kafka 相关 agent 上传日志到 CLS。
使用场景
日志应用中使用 Kafka 作为消息管道是非常普遍的场景。如通过机器上的开源采集客户端或者使用 producer 直接写入日志,再通过消费管道提供给下游如 spark、flink 等进行消费。CLS 具备完整的 Kafka 数据管道上下行能力,以下主要介绍哪些场景适合您使用 Kafka 协议上传日志。
- 场景1:您已有基于开源采集的自建系统,不希望有复杂的二次改造,您可以通过修改配置文件将日志上传到 CLS。
例如,您之前使用 ELK 搭建日志系统的客户,现在只需要通过修改 Filebeat 或者 Logstash 的配置文件,将 Output 配置(详情请参见 filebeat 配置)到 CLS,即可非常方便简洁地将日志上传。 - 场景2:您希望通过 Kafka producer SDK 来采集日志并上传,不必再安装采集 Agent。
- CLS 支持您使用各类 Kafka producer SDK 采集日志,并通过 Kafka 协议上传到 CLS。(详情请参见本文提供的 SDK 调用示例 )
相关限制
- 支持 Kafka 协议版本为:0.11.0.X,1.0.X,1.1.X,2.0.X,2.1.X,2.2.X,2.3.X,2.4.X,2.5.X,2.6.X,2.7.X,2.8.X。
- 支持压缩方式:gzip,snappy,lz4。
- 当前使用 SASL_PLAINTEXT 认证。
- 使用 Kafka 协议上传需要配置 RealtimeProducer 权限。
配置方式
使用 Kafka 协议上传日志至 CLS Kafka 生产端时,需要配置下 CLS Kafka 访问信息:
| 参数 | 说明 |
|---|---|
| 鉴权机制 | 当前支持 SASL_PLAINTEXT |
| hosts | CLS Kafka 地址,根据目标写入日志主题所在地域配置。 |
| topic | CLS Kafka topic 名称,配置为日志主题 ID。例如:76c63473-c496-466b-XXXX-XXXXXXXXXXXX |
| username | CLS Kafka 访问用户名,配置为日志集 ID。 例如:0f8e4b82-8adb-47b1-XXXX-XXXXXXXXXXXX |
| password | - CLS Kafka 访问密码,格式为 ${SecurityId}#${SecurityKey}。例如:XXXXXXXXXXXXXX#YYYYYYYY。 - 若要匿名写入,格式为 topic_id#${日志主题 ID}。例如:topic_id#76c63473-c496-466b-XXXX-XXXX。 注意: 若使用匿名写入, 目标日志主题需开启匿名访问, 并在匿名操作选择 Kafka 协议上传日志。 详情请参考 管理日志主题。 |
服务入口
注意:
本文档以城市1地域为例,内外网域名需使用不同端口标识,其他地域请替换地址前缀。
host(服务入口)填写样例:
地域 |
网络类型 | 端口号 | 服务入口 |
|---|---|---|---|
| 城市1 | 内网 | 9095 | tcloud-cls-kafka-access.{{regionName}}.{{domainMain}} |
示例
Beat 调用示例
filebeat/winlogbeat 配置
output.kafka:
enabled: true
hosts: ["tcloud-cls-kafka-access.{{regionName}}.{{domainMain}}:9095"] # TODO 服务地址;内网端口9095
topic: "${ClsTopicID}" # TODO 日志主题ID
version: "0.11.0.2"
compression: "${compress}" # TODO 配置压缩方式,支持gzip,snappy,lz4;例如"lz4"
username: "${ClslogsetID}" # TODO 日志集ID
# 若要匿名写入,password: "topic_id#${日志主题 ID}"
password: "${SecurityId}#${SecurityKey}"
Logstash 调用示例
logstash 配置
output {
kafka {
topic_id => "${ClstopicID}"
bootstrap_servers => "tcloud-cls-kafka-access.{{regionName}}.{{domainMain}}:${port}"
sasl_mechanism => "PLAIN"
security_protocol => "SASL_PLAINTEXT"
compression_type => "${compress}"
# 若要匿名写入,password='topic_id#${日志主题 ID}'
sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='${ClslogsetID}' password='${securityID}#${securityKEY}';"
codec => json
}
}
SDK 调用示例
Golang SDK 调用示例
以下以 sarama.V1_1_0_0为例,其他版本按照类似规则配置:
import (
"fmt"
"github.com/Shopify/sarama"
)
func main() {
config := sarama.NewConfig()
config.Net.SASL.Mechanism = "PLAIN"
config.Net.SASL.Version = int16(1)
config.Net.SASL.Enable = true
// TODO 日志集 ID
config.Net.SASL.User = "${logsetID}"
// TODO 格式为 ${SecurityId}#${SecurityKey},若要匿名写入,格式为 topic_id#${日志主题 ID}
config.Net.SASL.Password = "${SecurityId}#${SecurityKey}"
config.Producer.Return.Successes = true
// TODO 根据使用场景选择acks的值
config.Producer.RequiredAcks = ${acks}
config.Version = sarama.V1_1_0_0
// TODO 配置压缩方式
config.Producer.Compression = ${compress}
// TODO 服务地址;内网端口9095
producer, err := sarama.NewSyncProducer([]string{"tcloud-cls-kafka-access.{{regionName}}.{{domainMain}}:9095"}, config)
if err != nil {
panic(err)
}
msg := &sarama.ProducerMessage{
Topic: "${topicID}", // TODO 日志主题ID
Value: sarama.StringEncoder("goland sdk sender demo"),
}
// 发送消息
for i := 0; i <= 5; i++ {
partition, offset, err := producer.SendMessage(msg)
if err != nil {
panic(err)
}
fmt.Printf("send response; partition:%d, offset:%d\n", partition, offset)
}
_ = producer.Close()
}
Python SDK 调用示例
from kafka import KafkaProducer
if __name__ == '__main__':
produce = KafkaProducer(
# TODO 服务地址;内网端口9095
bootstrap_servers=["tcloud-cls-kafka-access.{{regionName}}.{{domainMain}}:9095"],
security_protocol='SASL_PLAINTEXT',
sasl_mechanism='PLAIN',
# TODO 日志集 ID
sasl_plain_username='${logsetID}',
# TODO 格式为 ${SecurityId}#${SecurityKey}, 若要匿名写入,格式为 topic_id#${日志主题 ID}
sasl_plain_password='${SecurityId}#${SecurityKey}',
api_version=(0, 11, 0),
# TODO 配置压缩方式
compression_type="${compress_type}",
)
for i in range(0, 5):
# 发送消息 TODO 日志主题ID
future = produce.send(topic="${topicID}", value=b'python sdk sender demo')
result = future.get(timeout=10)
print(result)
Java SDK 调用示例
maven 依赖:
<dependencies>
<!--https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.2</version>
</dependency>
</dependencies>
代码示例:
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class ProducerDemo {
public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
// 0.配置一系列参数
Properties props = new Properties();
// TODO 使用时
props.put("bootstrap.servers", "tcloud-cls-kafka-access.{{regionName}}.{{domainMain}}:9095");
// TODO 以下值根据业务场景设置
props.put("acks", ${acks});
props.put("retries", ${retries});
props.put("batch.size", ${batch.size});
props.put("linger.ms", ${linger.ms});
props.put("buffer.memory", ${buffer.memory});
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "${compress_type}"); // TODO 配置压缩方式
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "PLAIN");
// TODO 用户名为logsetID;密码为securityID和securityKEY的组合securityID#securityKEY,格式为 ${SecurityId}#${SecurityKey},
// 若要匿名写入,密码为 topic_id#${日志主题 ID}
props.put("sasl.jaas.config",
"org.apache.kafka.common.security.plain.PlainLoginModule required username='${logsetID}' password='${SecurityId}#${SecurityKey}';");
// 1.创建一个生产者对象
Producer<String, String> producer = new KafkaProducer<String, String>(props);
// 2.调用send方法 TODO 日志主题ID
Future<RecordMetadata> meta = producer.send(new ProducerRecord<String, String>("${topicID}", ${message}));
RecordMetadata recordMetadata = meta.get(${timeout}, TimeUnit.MILLISECONDS);
System.out.println("offset = " + recordMetadata.offset());
// 3.关闭生产者
producer.close();
}
}
C SDK 调用示例
// https://github.com/edenhill/librdkafka - master
#include <iostream>
#include <librdkafka/rdkafka.h>
#include <string>
#include <unistd.h>
#define BOOTSTRAP_SERVER "tcloud-cls-kafka-access.{{regionName}}.{{domainMain}}:${port}"
// USERNAME 为日志集 ID
#define USERNAME "${logsetID}"
// PASSWORD 格式为 ${SecurityId}#${SecurityKey}, 若要匿名写入,格式为 topic_id#${日志主题 ID}
#define PASSWORD "${SecurityId}#${SecurityKey}"
// 日志主题ID
#define TOPIC "${topicID}"
#define ACKS "${acks}"
// 配置压缩方式
#define COMPRESS_TYPE "${compress_type}"
static void dr_msg_cb(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, void *opaque) {
if (rkmessage->err) {
fprintf(stdout, "%% Message delivery failed : %s\n", rd_kafka_err2str(rkmessage->err));
} else {
fprintf(stdout, "%% Message delivery successful %zu:%d\n", rkmessage->len, rkmessage->partition);
}
}
int main(int argc, char **argv) {
// 1. 初始化配置
rd_kafka_conf_t *conf = rd_kafka_conf_new();
rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);
char errstr[512];
if (rd_kafka_conf_set(conf, "bootstrap.servers", BOOTSTRAP_SERVER, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
rd_kafka_conf_destroy(conf);
fprintf(stdout, "%s\n", errstr);
return -1;
}
if (rd_kafka_conf_set(conf, "acks", ACKS, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
rd_kafka_conf_destroy(conf);
fprintf(stdout, "%s\n", errstr);
return -1;
}
if (rd_kafka_conf_set(conf, "compression.codec", COMPRESS_TYPE, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
rd_kafka_conf_destroy(conf);
fprintf(stdout, "%s\n", errstr);
return -1;
}
// 设置认证方式
if (rd_kafka_conf_set(conf, "security.protocol", "sasl_plaintext", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
rd_kafka_conf_destroy(conf);
fprintf(stdout, "%s\n", errstr);
return -1;
}
if (rd_kafka_conf_set(conf, "sasl.mechanisms", "PLAIN", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
rd_kafka_conf_destroy(conf);
fprintf(stdout, "%s\n", errstr);
return -1;
}
if (rd_kafka_conf_set(conf, "sasl.username", USERNAME, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
rd_kafka_conf_destroy(conf);
fprintf(stdout, "%s\n", errstr);
return -1;
}
if (rd_kafka_conf_set(conf, "sasl.password", PASSWORD, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
rd_kafka_conf_destroy(conf);
fprintf(stdout, "%s\n", errstr);
return -1;
}
// 2. 创建 handler
rd_kafka_t *rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
if (!rk) {
rd_kafka_conf_destroy(conf);
fprintf(stdout, "create produce handler failed: %s\n", errstr);
return -1;
}
// 3. 发送数据
std::string value = "test lib kafka ---- ";
for (int i = 0; i < 100; ++i) {
retry:
rd_kafka_resp_err_t err = rd_kafka_producev(
rk, RD_KAFKA_V_TOPIC(TOPIC),
RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
RD_KAFKA_V_VALUE((void *) value.c_str(), value.size()),
RD_KAFKA_V_OPAQUE(nullptr), RD_KAFKA_V_END);
if (err) {
fprintf(stdout, "Failed to produce to topic : %s, error : %s", TOPIC, rd_kafka_err2str(err));
if (err == RD_KAFKA_RESP_ERR__QUEUE_FULL) {
rd_kafka_poll(rk, 1000);
goto retry;
}
} else {
fprintf(stdout, "send message to topic successful : %s\n", TOPIC);
}
rd_kafka_poll(rk, 0);
}
std::cout << "message flush final" << std::endl;
rd_kafka_flush(rk, 10 * 1000);
if (rd_kafka_outq_len(rk) > 0) {
fprintf(stdout, "%d message were not deliverer\n", rd_kafka_outq_len(rk));
}
rd_kafka_destroy(rk);
return 0;
}
C# SDK 调用示例
/*
* 该demo只提供了最简单的使用方法,具体生产还需要结合调用放来实现
* 在使用过程中,demo中留的todo项需要替换使用
*
* 注意:
* 1. 该Demo基于Confluent.Kafka/1.8.2版本验证通过
* 2. MessageMaxBytes最大值不能超过5M
* 3. 该demo使用同步的方式生产,在使用时也可根据业务场景调整为异步的方式
* 4. 其他参数在使用过程中可以根据业务参考文档自己调整:https://docs.confluent.io/platform/current/clients/confluent-kafka-dotnet/_site/api/Confluent.Kafka.ProducerConfig.html
*
* Confluent.Kafka 参考文档:https://docs.confluent.io/platform/current/clients/confluent-kafka-dotnet/_site/api/Confluent.Kafka.html
*/
using Confluent.Kafka;
namespace Producer
{
class Producer
{
private static void Main(string[] args)
{
var config = new ProducerConfig
{
// Kafka 填写,注意内网端口9095
BootstrapServers = "${domain}:${port}",
SaslMechanism = SaslMechanism.Plain,
// TODO topic所属日志集ID
SaslUsername = "${logsetID}",
// TODO topic所属uin的密钥,格式为 ${SecurityId}#${SecurityKey},
// 若要匿名写入,格式为 topic_id#${日志主题 ID}
SaslPassword = "${SecurityId}#${SecurityKey}",
SecurityProtocol = SecurityProtocol.SaslPlaintext,
// TODO 根据实际使用场景赋值。可取值: Acks.None、Acks.Leader、Acks.All
Acks = Acks.None,
// TODO 请求消息的最大大小,最大不能超过5M
MessageMaxBytes = 5242880
};
// deliveryHandler
Action<DeliveryReport<Null, string>> handler =
r => Console.WriteLine(!r.Error.IsError ? $"Delivered message to {r.TopicPartitionOffset}" : $"Delivery Error: {r.Error.Reason}");
using (var produce = new ProducerBuilder<Null, string>(config).Build())
{
try
{
// TODO 测试验证代码
for (var i = 0; i < 100; i++)
{
// TODO 替换日志主题ID
produce.Produce("${topicID}", new Message<Null, string> { Value = "C# demo value" }, handler);
}
produce.Flush(TimeSpan.FromSeconds(10));
}
catch (ProduceException<Null, string> pe)
{
Console.WriteLine($"send message receiver error : {pe.Error.Reason}");
}
}
}
}
}