01 VPC 网络接入
最近更新时间: 2024-06-12 15:06:00
操作背景
该任务以 Python 客户端为例指导您使用VPC网络接入消息队列 CKafka 并收发消息。
前提条件
操作步骤
将下载的demo中的pythonkafkademo上传至linux服务器,登陆linux服务器,进入pythonkafkademo目录。
步骤一:添加 Python 依赖库
执行以下命令安装: 、
pip install kafka-python
步骤二:生产消息
修改生产消息程序producer.py中配置参数。
#coding:utf8 from kafka import KafkaProducer producer = KafkaProducer( bootstrap_servers = ['$domainName:$port'], api_version = (0,10,0) ) message = "Hello World! Hello Ckafka!" msg = json.dumps(message).encode() producer.send('topic_name',value = msg) print("produce message " + message + " success."); producer.close()
参数 描述 bootstrap_servers 接入网络,在控制台的实例详情页面【接入方式】模块的网络列复制。 topic_name Topic名称,您可以在控制台上【topic管理】页面复制。 编译并运行producer.py。
查看运行结果。
4. 在 CKafka 控制台的【topic管理】页面,选择对应的 Topic , 点击【更多】>【消息查询】,查看刚刚发送的消息。
步骤三:消费消息
修改消费消息程序consumer.py中配置参数。
#coding:utf8 from kafka import KafkaConsumer consumer = KafkaConsumer( '$topic_name', group_id = "$group_id", bootstrap_servers = ['$domainName:$port'], api_version = (0,10,0) ) for message in consumer: print ("Topic:[%s] Partition:[%d] Offset:[%d] Value:[%s]" % (message.topic, message.partition, message.offset, message.value))
参数 描述 bootstrap_servers 接入网络,在控制台的实例详情页面【接入方式】模块的网络列复制。 group_id 消费者的组 ID,根据业务需求自定义 topic_name Topic名称,您可以在控制台上【topic管理】页面复制。 编译并运行consumer.py。
查看运行结果。
在 CKafka 控制台的【Consumer Group】页面,选择对应的消费组名称,在主题名称输入 topic 名称,点击【查询详情】,查看消费详情。