01 VPC 网络接入

最近更新时间: 2024-10-17 17:10:00

操作背景

该任务以 Python 客户端为例指导您使用VPC网络接入消息队列 CKafka 并收发消息。

前提条件

操作步骤

将下载的demo中的pythonkafkademo上传至linux服务器,登陆linux服务器,进入pythonkafkademo目录。

步骤一:添加 Python 依赖库

执行以下命令安装:

pip install kafka-python

步骤二:生产消息

  1. 修改生产消息程序producer.py中配置参数。
#coding:utf8
from kafka import KafkaProducer

producer = KafkaProducer(
   bootstrap_servers = ['$domainName:$port'],
   api_version = (0,10,0)
)
message = "Hello World! Hello Ckafka!"
msg = json.dumps(message).encode()
producer.send('topic_name',value = msg)
print("produce message " + message + " success.");
producer.close()
参数 描述
bootstrap_servers 接入网络,在控制台的实例详情页面【接入方式】模块的网络列复制。
img
topic_name Topic名称,您可以在控制台上【topic管理】页面复制。img
  1. 编译并运行producer.py。

  2. 查看运行结果。

  1. 在 CKafka 控制台的【topic管理】页面,选择对应的 Topic , 点击【更多】>【消息查询】,查看刚刚发送的消息。

步骤三:消费消息

  1. 修改消费消息程序consumer.py中配置参数。
#coding:utf8
from kafka import KafkaConsumer

consumer = KafkaConsumer(
    '$topic_name',
    group_id = "$group_id",
    bootstrap_servers = ['$domainName:$port'],
    api_version = (0,10,0)
)

for message in consumer:
    print ("Topic:[%s] Partition:[%d] Offset:[%d] Value:[%s]" % (message.topic, message.partition, message.offset, message.value))
参数 描述
bootstrap_servers 接入网络,在控制台的实例详情页面【接入方式】模块的网络列复制。img
group_id 消费者的组 ID,根据业务需求自定义
topic_name Topic名称,您可以在控制台上【topic管理】页面复制。img
  1. 编译并运行consumer.py。

  2. 查看运行结果。

  1. 在 CKafka 控制台的【Consumer Group】页面,选择对应的消费组名称,在主题名称输入 topic 名称,点击【查询详情】,查看消费详情。