Python SDK
最近更新时间: 2024-06-12 15:06:00
操作场景
本文以调用 Python SDK 为例介绍通过开源 SDK 实现消息收发的操作过程,帮助您更好地理解消息收发的完整过程。
前提条件
操作步骤
准备环境。在客户端环境安装 pulsar-client 库,可以使用 pip 进行安装,也可以使用其他方式,参见 Pulsar Python client。
pip install pulsar-client==2.8.1
创建客户端。
# 创建客户端 client = pulsar.Client( authentication=pulsar.AuthenticationToken( # 已授权角色密钥 AUTHENTICATION), # 服务接入地址 service_url=SERVICE_URL, # 接入点路由ID listener_name=ROUTERID)
参数 说明 SERVICE_URL 集群接入地址,可以在控制台集群管理接入点页面查看并复制。 AUTHENTICATION 角色密钥,在角色管理页面复制密钥列复制。 ROUTERID 创建生产者。
# 创建生产者 producer = client.create_producer( # topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称,从【Topic管理】处复制 topic='pulsar-xxx/sdk_python/topic1' )
说明:
Topic 名称需要填入完整路径,即 persistent://clusterid/namespace/Topic,clusterid/namespace/topic 的部分可以从控制台上 Topic管理 页面直接复制。
发送消息。
# 发送消息 producer.send( # 消息内容 'Hello python client, this is a msg.'.encode('utf-8'), # 消息参数 properties={'k': 'v'}, # 业务key partition_key='yourKey' )
还可以使用异步方式发送消息。
# 异步发送回调 def send_callback(send_result, msg_id): print('Message published: result:{} msg_id:{}'.format(send_result, msg_id)) # 发送消息 producer.send_async( # 消息内容 'Hello python client, this is a async msg.'.encode('utf-8'), # 异步回调 callback=send_callback, # 消息配置 properties={'k': 'v'}, # 业务key partition_key='yourKey' )
创建消费者。
# 订阅消息 consumer = client.subscribe( # topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称,从【Topic管理】处复制 topic='pulsar-xxx/sdk_python/topic1', # 订阅名称 subscription_name='sub_topic1' )
说明:
- subscriptionName 需要写入订阅名,可在消费管理界面查看。
- Topic 名称需要填入完整路径,即 persistent://clusterid/namespace/Topic,clusterid/namespace/topic 的部分可以从控制台上 Topic管理 页面直接复制。
消费消息。
# 获取消息 msg = consumer.receive() try: # 模拟业务 print("Received message '{}' id='{}'".format(msg.data(), msg.message_id())) # 消费成功,回复ack consumer.acknowledge(msg) except: # 消费失败,消息将会重新投递 consumer.negative_acknowledge(msg)
登录 TDMQ Pulsar 版控制台,依次点击 Topic 管理 > Topic 名称进入消费管理页面,点开订阅名下方右三角号,可查看生产消费记录。
说明:
上述是对消息的发布和订阅方式的简单介绍。更多操作可参见 Demo 或 Pulsar 官方文档。