使用 Kafka 协议消费功能,您可以将一个日志主题,当作一个 Kafka Topic 来消费。可将采集到 CLS 的日志数据,消费到下游的大数据组件或者数据仓库。
本文提供了开源组件 Flink、Flume、Logstash消费日志主题的Demo。
前提条件
- 已开通日志服务,创建日志集与日志主题,并成功采集到日志数据。
- 确保当前操作账号拥有开通 Kafka 协议消费的权限。
操作步骤
- 登录日志服务控制台。
- 在左侧导航栏中,选择日志主题。
- 在日志主题页面,单击需要使用 Kafka 协议消费的日志主题 ID/名称,进入日志主题管理页面。
- 在日志主题管理页面中,单击Kafka 协议消费页签。
- 单击右侧的编辑,将“当前状态”的开关按钮设置为打开状态后,单击确定。
- 控制台给出 Topic、Host+Port 的信息。用户可以复制信息,构造消费者。
消费者参数说明
Kafka 协议消费者的参数说明如下:
| 参数 | 说明 |
|---|---|
| 用户认证方式 | 目前仅支持 SASL_PLAINTEXT。 |
| topic | CLS kafka 协议消费控制台给出的主题名称,单击旁边按钮可以复制。例如XXXXXX-633a268c-XXXX-4a4c-XXXX-7a9a1a7baXXXX。 |
username |
配置为${LogSetID},即日志集 ID。 例如:0f8e4b82-8adb-47b1-XXXX-XXXXXXXXXXXX ,在日志主题列表中可以复制日志集 ID。 |
| password | 配置为${SecretId}#${SecretKey}。例如:XXXXXXXXXXXXXX#YYYYYYYY,请登录访问管理控制台,在左侧导航栏中单击云API密钥,API 密钥或者项目密钥均可使用,建议使用子账号密钥,主账号为子账号授权时,遵循最小权限原则,即子账号的访问策略中的 action、resource 都配置为最小范围,可以满足操作即可。 |
注意:
下面的例子中的代码,jaas.config 的配置,
${SecretId}#${SecretKey}后有(;分号),不要漏填,否则会报错。
Python SDK
import uuid
from kafka import KafkaConsumer,TopicPartition,OffsetAndMetadata
consumer = KafkaConsumer(
#cls kafka协议消费控制台给出的的主题名称,例如XXXXXX-633a268c-XXXX-4a4c-XXXX-7a9a1a7baXXXX,可在控制台复制
'您的消费主题',
group_id = uuid.uuid4().hex,
auto_offset_reset='earliest',
#服务地址+端口,内网端口9095,例子是内网消费,请根据您的实际情况填写
bootstrap_servers = ['tcloud-cls-kafka-access-consumer.{{regionName}}.{{domainMain}}:9095'],
security_protocol = "SASL_PLAINTEXT",
sasl_mechanism = 'PLAIN',
#用户名是日志集合ID,例如ca5cXXXXdd2e-4ac0af12-92d4b677d2c6
sasl_plain_username = "${logsetID}",
#密码是用户的SecretId#SecretKey组合的字符串,例如AKIDWrwkHYYHjvqhz1mHVS8YhXXXX#XXXXuXtymIXT0Lac注意不要丢失#。建议使用子账号密钥,主账号为子账号授权时,遵循最小权限原则,即子账号的访问策略中的action、resource都配置为最小范围,可以满足操作即可.
sasl_plain_password = "${SecretId}#${SecretKey}",
api_version = (1,1,1)
)
print('begin')
for message in consumer:
print('begins')
print ("Topic:[%s] Partition:[%d] Offset:[%d] Value:[%s]" % (message.topic, message.partition, message.offset, message.value))
print('end')
Logstash 消费 CLS 日志
input {
kafka {
#cls kafka协议消费控制台给出的的主题名称,例如XXXXXX-633a268c-XXXX-4a4c-XXXX-7a9a1a7baXXXX,可在控制台复制
topics => "您的消费主题"
#服务地址+端口,内网端口9095,例子是内网消费,请根据您的实际情况填写
bootstrap_servers => "tcloud-cls-kafka-access-consumer.{{regionName}}.{{domainMain}}:9095"
group_id => "您的消费组id"
security_protocol => "SASL_PLAINTEXT"
sasl_mechanism => "PLAIN"
#用户名是日志集合ID,例如ca5cXXXXdd2e-4ac0af12-92d4b677d2c6
#密码是用户的SecretId#SecretKey组合的字符串,例如AKIDWrwkHYYHjvqhz1mHVS8YhXXXX#XXXXuXtymIXT0Lac注意不要丢失#。建议使用子账号密钥,主账号为子账号授权时,遵循最小权限原则,即子账号的访问策略中的action、resource都配置为最小范围,可以满足操作即可.
sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='${logsetID}' password='${securityID}#${securityKEY}';"
}
}
output {
stdout { codec => json }
}