C++ SDK
最近更新时间: 2024-06-12 15:06:00
操作场景
本文以调用 C++ SDK 为例介绍通过开源 SDK 实现消息收发的操作过程,帮助您更好地理解消息收发的完整过程。
前提条件
操作步骤
准备环境。
在客户端环境安装 Pulsar C++ client,安装过程可参考官方教程 Pulsar C++ client。
在项目中引入 Pulsar C++ client 相关头文件及动态库。
创建客户端。
// 客户端配置信息 ClientConfiguration config; // 设置授权角色密钥 AuthenticationPtr auth = pulsar::AuthToken::createWithToken(AUTHENTICATION); std::string routerID = ROUTERID; config.setAuth(auth); config.setListenerName(routerID); // 创建客户端 Client client(SERVICE_URL, config);
参数 说明 SERVICE_URL 集群接入地址,可以在控制台集群管理接入点页面查看并复制。 AUTHENTICATION
角色密钥,在角色管理页面复制密钥列复制。ROUTERID 创建生产者。
// 生产者配置 ProducerConfiguration producerConf; producerConf.setBlockIfQueueFull(true); producerConf.setSendTimeout(5000); // 生产者 Producer producer; // 创建生产者 Result result = client.createProducer( // topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称 "persistent://pulsar-xxx/sdk_cpp/topic1", producerConf, producer); if (result != ResultOk) { std::cout << "Error creating producer: " << result << std::endl; return -1; }
说明:
Topic 名称需要填入完整路径,即 persistent://clusterid/namespace/Topic,clusterid/namespace/topic 的部分可以从控制台上 Topic管理 页面直接复制。
发送消息。
// 消息内容 std::string content = "hello cpp client, this is a msg"; // 构建消息对象 Message msg = MessageBuilder().setContent(content) .setPartitionKey("mykey") // 业务key .setProperty("x", "1") // 设置消息参数 .build(); // 发送消息 Result result = producer.send(msg); if (result != ResultOk) { // 发送失败 std::cout << "The message " << content << " could not be sent, received code: " << result << std::endl; } else { // 发送成功 std::cout << "The message " << content << " sent successfully" << std::endl; }
创建消费者。
// 消费者配置信息 ConsumerConfiguration consumerConfiguration; consumerConfiguration.setSubscriptionInitialPosition(pulsar::InitialPositionEarliest); // 消费者 Consumer consumer; // 订阅topic Result result = client.subscribe( // topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称 "persistent://pulsar-xxx/sdk_cpp/topic1", // 订阅名称 "sub_topic1", consumerConfiguration, consumer); if (result != ResultOk) { std::cout << "Failed to subscribe: " << result << std::endl; return -1; }
说明:
- subscriptionName 需要写入订阅名,可在消费管理界面查看。
- Topic 名称需要填入完整路径,即 persistent://clusterid/namespace/Topic,clusterid/namespace/topic 的部分可以从控制台上 Topic管理 页面直接复制。
消费消息。
Message msg; // 获取消息 consumer.receive(msg); // 模拟业务 std::cout << "Received: " << msg << " with payload '" << msg.getDataAsString() << "'" << std::endl; // 回复ack consumer.acknowledge(msg); // 消费失败回复nack, 消息将会重新投递 // consumer.negativeAcknowledge(msg);
登录 TDMQ Pulsar 版控制台,依次点击 Topic 管理 > Topic 名称进入消费管理页面,点开订阅名下方右三角号,可查看生产消费记录。
说明:
上述是对消息的发布和订阅方式的简单介绍。更多操作可参见 Demo 或 Pulsar 官方文档。