01 VPC 网络接入
最近更新时间: 2024-10-17 17:10:00
操作背景
该任务以 Python 客户端为例指导您使用VPC网络接入消息队列 CKafka 并收发消息。
前提条件
操作步骤
将下载的demo中的pythonkafkademo上传至linux服务器,登陆linux服务器,进入pythonkafkademo目录。
步骤一:添加 Python 依赖库
执行以下命令安装:
、
pip install kafka-python
步骤二:生产消息
- 修改生产消息程序producer.py中配置参数。
#coding:utf8
from kafka import KafkaProducer
producer = KafkaProducer(
bootstrap_servers = ['$domainName:$port'],
api_version = (0,10,0)
)
message = "Hello World! Hello Ckafka!"
msg = json.dumps(message).encode()
producer.send('topic_name',value = msg)
print("produce message " + message + " success.");
producer.close()
参数 | 描述 |
---|---|
bootstrap_servers | 接入网络,在控制台的实例详情页面【接入方式】模块的网络列复制。 |
topic_name | Topic名称,您可以在控制台上【topic管理】页面复制。 |
编译并运行producer.py。
查看运行结果。
- 在 CKafka 控制台的【topic管理】页面,选择对应的 Topic , 点击【更多】>【消息查询】,查看刚刚发送的消息。
步骤三:消费消息
- 修改消费消息程序consumer.py中配置参数。
#coding:utf8
from kafka import KafkaConsumer
consumer = KafkaConsumer(
'$topic_name',
group_id = "$group_id",
bootstrap_servers = ['$domainName:$port'],
api_version = (0,10,0)
)
for message in consumer:
print ("Topic:[%s] Partition:[%d] Offset:[%d] Value:[%s]" % (message.topic, message.partition, message.offset, message.value))
参数 | 描述 |
---|---|
bootstrap_servers | 接入网络,在控制台的实例详情页面【接入方式】模块的网络列复制。 |
group_id | 消费者的组 ID,根据业务需求自定义 |
topic_name | Topic名称,您可以在控制台上【topic管理】页面复制。 |
编译并运行consumer.py。
查看运行结果。
- 在 CKafka 控制台的【Consumer Group】页面,选择对应的消费组名称,在主题名称输入 topic 名称,点击【查询详情】,查看消费详情。