01 VPC 网络接入

最近更新时间: 2024-06-12 15:06:00

操作背景

该任务以 Python 客户端为例指导您使用VPC网络接入消息队列 CKafka 并收发消息。

前提条件

操作步骤

将下载的demo中的pythonkafkademo上传至linux服务器,登陆linux服务器,进入pythonkafkademo目录。

步骤一:添加 Python 依赖库

执行以下命令安装: 、

pip install kafka-python

步骤二:生产消息

  1. 修改生产消息程序producer.py中配置参数。

    #coding:utf8
    from kafka import KafkaProducer
    
    producer = KafkaProducer(
      bootstrap_servers = ['$domainName:$port'],
      api_version = (0,10,0)
    )
    message = "Hello World! Hello Ckafka!"
    msg = json.dumps(message).encode()
    producer.send('topic_name',value = msg)
    print("produce message " + message + " success.");
    producer.close()
    参数 描述
    bootstrap_servers 接入网络,在控制台的实例详情页面【接入方式】模块的网络列复制。
    img
    topic_name Topic名称,您可以在控制台上【topic管理】页面复制。img
  2. 编译并运行producer.py。

  3. 查看运行结果。

4. 在 CKafka 控制台的【topic管理】页面,选择对应的 Topic , 点击【更多】>【消息查询】,查看刚刚发送的消息。

步骤三:消费消息

  1. 修改消费消息程序consumer.py中配置参数。

    #coding:utf8
    from kafka import KafkaConsumer
    
    consumer = KafkaConsumer(
       '$topic_name',
       group_id = "$group_id",
       bootstrap_servers = ['$domainName:$port'],
       api_version = (0,10,0)
    )
    
    for message in consumer:
       print ("Topic:[%s] Partition:[%d] Offset:[%d] Value:[%s]" % (message.topic, message.partition, message.offset, message.value))
    参数 描述
    bootstrap_servers 接入网络,在控制台的实例详情页面【接入方式】模块的网络列复制。img
    group_id 消费者的组 ID,根据业务需求自定义
    topic_name Topic名称,您可以在控制台上【topic管理】页面复制。img
  2. 编译并运行consumer.py。

  3. 查看运行结果。

  4. 在 CKafka 控制台的【Consumer Group】页面,选择对应的消费组名称,在主题名称输入 topic 名称,点击【查询详情】,查看消费详情。