Kafka 协议消费

最近更新时间: 2026-03-13 09:03:00

使用 Kafka 协议消费功能,您可以将一个日志主题,当作一个 Kafka Topic 来消费。可将采集到 CLS 的日志数据,消费到下游的大数据组件或者数据仓库。
本文提供了开源组件 Flink、Flume、Logstash消费日志主题的Demo。

前提条件

  • 已开通日志服务,创建日志集与日志主题,并成功采集到日志数据。
  • 确保当前操作账号拥有开通 Kafka 协议消费的权限。

操作步骤

  1. 登录日志服务控制台。
  2. 在左侧导航栏中,选择日志主题
  3. 在日志主题页面,单击需要使用 Kafka 协议消费的日志主题 ID/名称,进入日志主题管理页面。
  4. 在日志主题管理页面中,单击Kafka 协议消费页签。
  5. 单击右侧的编辑,将“当前状态”的开关按钮设置为打开状态后,单击确定
  6. 控制台给出 Topic、Host+Port 的信息。用户可以复制信息,构造消费者。

消费者参数说明

Kafka 协议消费者的参数说明如下:

参数 说明
用户认证方式 目前仅支持 SASL_PLAINTEXT。
topic CLS kafka 协议消费控制台给出的主题名称,单击旁边按钮可以复制。例如XXXXXX-633a268c-XXXX-4a4c-XXXX-7a9a1a7baXXXX。

username
配置为${LogSetID},即日志集 ID。 例如:0f8e4b82-8adb-47b1-XXXX-XXXXXXXXXXXX ,在日志主题列表中可以复制日志集 ID。
password 配置为${SecretId}#${SecretKey}。例如:XXXXXXXXXXXXXX#YYYYYYYY,请登录访问管理控制台,在左侧导航栏中单击云API密钥,API 密钥或者项目密钥均可使用,建议使用子账号密钥,主账号为子账号授权时,遵循最小权限原则,即子账号的访问策略中的 action、resource 都配置为最小范围,可以满足操作即可。

注意:

下面的例子中的代码,jaas.config 的配置,${SecretId}#${SecretKey}后有(;分号),不要漏填,否则会报错。

Python SDK

import uuid
from kafka import KafkaConsumer,TopicPartition,OffsetAndMetadata
consumer = KafkaConsumer(
#cls kafka协议消费控制台给出的的主题名称,例如XXXXXX-633a268c-XXXX-4a4c-XXXX-7a9a1a7baXXXX,可在控制台复制
'您的消费主题',  
group_id = uuid.uuid4().hex,
auto_offset_reset='earliest',
#服务地址+端口,内网端口9095,例子是内网消费,请根据您的实际情况填写
bootstrap_servers = ['tcloud-cls-kafka-access-consumer.{{regionName}}.{{domainMain}}:9095'],
security_protocol = "SASL_PLAINTEXT",
sasl_mechanism = 'PLAIN',   
#用户名是日志集合ID,例如ca5cXXXXdd2e-4ac0af12-92d4b677d2c6  
sasl_plain_username = "${logsetID}",
#密码是用户的SecretId#SecretKey组合的字符串,例如AKIDWrwkHYYHjvqhz1mHVS8YhXXXX#XXXXuXtymIXT0Lac注意不要丢失#。建议使用子账号密钥,主账号为子账号授权时,遵循最小权限原则,即子账号的访问策略中的action、resource都配置为最小范围,可以满足操作即可.
sasl_plain_password = "${SecretId}#${SecretKey}",
api_version = (1,1,1)
)
print('begin')
for message in consumer:
    print('begins')
    print ("Topic:[%s] Partition:[%d] Offset:[%d] Value:[%s]" % (message.topic, message.partition, message.offset, message.value))
    print('end')

Logstash 消费 CLS 日志

input {
    kafka {
        #cls kafka协议消费控制台给出的的主题名称,例如XXXXXX-633a268c-XXXX-4a4c-XXXX-7a9a1a7baXXXX,可在控制台复制
        topics => "您的消费主题"
        #服务地址+端口,内网端口9095,例子是内网消费,请根据您的实际情况填写
        bootstrap_servers => "tcloud-cls-kafka-access-consumer.{{regionName}}.{{domainMain}}:9095"
        group_id => "您的消费组id"
        security_protocol => "SASL_PLAINTEXT"
        sasl_mechanism => "PLAIN"
        #用户名是日志集合ID,例如ca5cXXXXdd2e-4ac0af12-92d4b677d2c6 
        #密码是用户的SecretId#SecretKey组合的字符串,例如AKIDWrwkHYYHjvqhz1mHVS8YhXXXX#XXXXuXtymIXT0Lac注意不要丢失#。建议使用子账号密钥,主账号为子账号授权时,遵循最小权限原则,即子账号的访问策略中的action、resource都配置为最小范围,可以满足操作即可.
        sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='${logsetID}' password='${securityID}#${securityKEY}';"
    }
    
}
output {
  stdout { codec => json }
}