C++ SDK

最近更新时间: 2024-06-12 15:06:00

操作场景

本文以调用 C++ SDK 为例介绍通过开源 SDK 实现消息收发的操作过程,帮助您更好地理解消息收发的完整过程。

前提条件

操作步骤

  1. 准备环境。

    1. 在客户端环境安装 Pulsar C++ client,安装过程可参考官方教程 Pulsar C++ client

    2. 在项目中引入 Pulsar C++ client 相关头文件及动态库。

  2. 创建客户端。

    // 客户端配置信息
    ClientConfiguration config;
    // 设置授权角色密钥
    AuthenticationPtr auth = pulsar::AuthToken::createWithToken(AUTHENTICATION);
    std::string routerID = ROUTERID;
    config.setAuth(auth);
    config.setListenerName(routerID);
    // 创建客户端
    Client client(SERVICE_URL, config);
    参数 说明
    SERVICE_URL 集群接入地址,可以在控制台集群管理接入点页面查看并复制。

    AUTHENTICATION
    角色密钥,在角色管理页面复制密钥列复制。
    ROUTERID
  3. 创建生产者。

    // 生产者配置
    ProducerConfiguration producerConf;
    producerConf.setBlockIfQueueFull(true);
    producerConf.setSendTimeout(5000);
    // 生产者
    Producer producer;
    // 创建生产者
    Result result = client.createProducer(
        // topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称
        "persistent://pulsar-xxx/sdk_cpp/topic1",
        producerConf,
        producer);
    if (result != ResultOk) {
        std::cout << "Error creating producer: " << result << std::endl;
        return -1;
    }

    说明:

    Topic 名称需要填入完整路径,即 persistent://clusterid/namespace/Topic,clusterid/namespace/topic 的部分可以从控制台上 Topic管理 页面直接复制。

  4. 发送消息。

    // 消息内容
    std::string content = "hello cpp client, this is a msg";
    // 构建消息对象
    Message msg = MessageBuilder().setContent(content)
        .setPartitionKey("mykey")  // 业务key
        .setProperty("x", "1")   // 设置消息参数
        .build();
    // 发送消息
    Result result = producer.send(msg);
    if (result != ResultOk) {
        // 发送失败
        std::cout << "The message " << content << " could not be sent, received code: " << result << std::endl;
    } else {
        // 发送成功
        std::cout << "The message " << content << " sent successfully" << std::endl;
    }
  5. 创建消费者。

    // 消费者配置信息
    ConsumerConfiguration consumerConfiguration;
    consumerConfiguration.setSubscriptionInitialPosition(pulsar::InitialPositionEarliest);
    // 消费者
    Consumer consumer;
    // 订阅topic
    Result result = client.subscribe(
        // topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称
        "persistent://pulsar-xxx/sdk_cpp/topic1",
        // 订阅名称
        "sub_topic1",
        consumerConfiguration,
        consumer);
    
    if (result != ResultOk) {
        std::cout << "Failed to subscribe: " << result << std::endl;
        return -1;
    }

    说明:

    • subscriptionName 需要写入订阅名,可在消费管理界面查看。
    • Topic 名称需要填入完整路径,即 persistent://clusterid/namespace/Topic,clusterid/namespace/topic 的部分可以从控制台上 Topic管理 页面直接复制。
  1. 消费消息。

    Message msg;
    // 获取消息
    consumer.receive(msg);
    // 模拟业务
    std::cout << "Received: " << msg << "  with payload '" << msg.getDataAsString() << "'" << std::endl;
    // 回复ack
    consumer.acknowledge(msg);
    // 消费失败回复nack, 消息将会重新投递
    // consumer.negativeAcknowledge(msg);
  2. 登录 TDMQ Pulsar 版控制台,依次点击 Topic 管理 > Topic 名称进入消费管理页面,点开订阅名下方右三角号,可查看生产消费记录。

    说明:

    上述是对消息的发布和订阅方式的简单介绍。更多操作可参见 DemoPulsar 官方文档