Java SDK

最近更新时间: 2024-06-12 15:06:00

操作场景

本文以调用 Java SDK 为例介绍通过开源 SDK 实现消息收发的操作过程,帮助您更好地理解消息收发的完整过程。

前提条件

操作步骤

  1. Java 项目中引入相关依赖,以 Maven 工程为例,在 pom.xml 添加以下依赖:

    <dependency>
         <groupId>org.apache.pulsar</groupId>
         <artifactId>pulsar-client</artifactId>
         <version>2.9.4</version>
    </dependency>
  2. 创建 Pulsar 客户端。

    PulsarClient pulsarClient = PulsarClient.builder()
                   // 服务接入地址
                   .serviceUrl("pulsar://" + SERVICE_URL)
                   // 路由ID
                   .listenerName("custom:" + ROUTERID)
                   // 授权角色密钥
            .authentication(AuthenticationFactory.token(AUTHENTICATION)).build();
    参数 说明
    SERVICE_URL 集群接入地址,可以在控制台 集群管理 接入点页面查看并复制。
    AUTHENTICATION 角色密钥,在 角色管理 页面复制密钥列复制。
    ROUTERID
  3. 创建生产者。

    // 构建生产者
    Producer<byte[]> producer = pulsarClient.newProducer()
            // 禁用掉batch功能
            .enableBatching(false)
            // topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称
            .topic(TOPICNAME).create();
    

    说明:

    参数 TOPICNAME 需要填入完整路径,即 persistent://clusterid/namespace/Topic,clusterid/namespace/topic 的部分可以从控制台上 Topic管理 页面直接复制。

  4. 发送消息。

    //发送消息
    MessageId msgId = producer.newMessage()
        // 消息内容
        .value("this is a new message.".getBytes(StandardCharsets.UTF_8))
        .send();
  5. 资源释放。

    // 关闭生产者
    producer.close();
    // 关闭客户端
    pulsarClient.close();
  6. 创建消费者。

    // 构建byte[]类型(默认类型)的消费者
    Consumer<byte[]> consumer = pulsarClient.newConsumer()
        // topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称,从【Topic管理】处复制
        .topic(TOPICNAME)
        // 需要在控制台Topic详情页创建好一个订阅,此处填写订阅名
        .subscriptionName(SUBSCRIPTIONNAME)
        // 声明消费模式为share(共享)模式
        .subscriptionType(SubscriptionType.Shared)
        // 配置从最早开始消费,否则可能会消费不到历史消息
        .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
        // 订阅
        .subscribe();

    说明:

    • 参数 SUBSCRIPTIONNAME 需要写入订阅名,可在消费管理界面查看。
    • 参数 TOPICNAME 需要填入完整路径,即 persistent://clusterid/namespace/Topic,clusterid/namespace/topic 的部分可以从控制台上 Topic管理 页面直接复制。
  7. 消费消息。

    // 接收当前offset对应的一条消息
    Message<byte[]> msg = consumer.receive();
    MessageId msgId = msg.getMessageId();
    String value = new String(msg.getValue());
    System.out.println("receive msg " + msgId + ",value:" + value);
    // 接收到之后必须要ack,否则offset会一直停留在当前消息,导致消息积压
    consumer.acknowledge(msg);
  8. 使用监听器进行消费。

    // 消息监听器
    MessageListener<byte[]> myMessageListener = (consumer, msg) -> {
        try {
            System.out.println("Message received: " + new String(msg.getData()));
            // 回复ack
            consumer.acknowledge(msg);
        } catch (Exception e) {
            // 消费失败,回复nack
            consumer.negativeAcknowledge(msg);
        }
    };
    pulsarClient.newConsumer()
        // topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称,从【Topic管理】处复制
        .topic("persistent://pulsar-mmqwr5xx9n7g/sdk_java/topic1")
        // 需要在控制台Topic详情页创建好一个订阅,此处填写订阅名
        .subscriptionName("sub_topic1")
        // 声明消费模式为exclusive(独占)模式
        .subscriptionType(SubscriptionType.Exclusive)
        // 设置监听器
        .messageListener(myMessageListener)
        // 配置从最早开始消费,否则可能会消费不到历史消息
        .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
        .subscribe();
  9. 登录 TDMQ Pulsar 版控制台,依次点击 Topic 管理 > Topic 名称进入消费管理页面,点开订阅名下方右三角号,可查看生产消费记录。

    说明:

    上述是对消息的发布和订阅方式的简单介绍。更多操作可参见 DemoPulsar 官方文档