Java SDK
最近更新时间: 2024-06-12 15:06:00
操作场景
本文以调用 Java SDK 为例介绍通过开源 SDK 实现消息收发的操作过程,帮助您更好地理解消息收发的完整过程。
前提条件
操作步骤
Java 项目中引入相关依赖,以 Maven 工程为例,在 pom.xml 添加以下依赖:
<dependency> <groupId>org.apache.pulsar</groupId> <artifactId>pulsar-client</artifactId> <version>2.9.4</version> </dependency>
创建 Pulsar 客户端。
PulsarClient pulsarClient = PulsarClient.builder() // 服务接入地址 .serviceUrl("pulsar://" + SERVICE_URL) // 路由ID .listenerName("custom:" + ROUTERID) // 授权角色密钥 .authentication(AuthenticationFactory.token(AUTHENTICATION)).build();
参数 说明 SERVICE_URL 集群接入地址,可以在控制台 集群管理 接入点页面查看并复制。 AUTHENTICATION 角色密钥,在 角色管理 页面复制密钥列复制。 ROUTERID 创建生产者。
// 构建生产者 Producer<byte[]> producer = pulsarClient.newProducer() // 禁用掉batch功能 .enableBatching(false) // topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称 .topic(TOPICNAME).create();
说明:
参数 TOPICNAME 需要填入完整路径,即 persistent://clusterid/namespace/Topic,clusterid/namespace/topic 的部分可以从控制台上 Topic管理 页面直接复制。
发送消息。
//发送消息 MessageId msgId = producer.newMessage() // 消息内容 .value("this is a new message.".getBytes(StandardCharsets.UTF_8)) .send();
资源释放。
// 关闭生产者 producer.close(); // 关闭客户端 pulsarClient.close();
创建消费者。
// 构建byte[]类型(默认类型)的消费者 Consumer<byte[]> consumer = pulsarClient.newConsumer() // topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称,从【Topic管理】处复制 .topic(TOPICNAME) // 需要在控制台Topic详情页创建好一个订阅,此处填写订阅名 .subscriptionName(SUBSCRIPTIONNAME) // 声明消费模式为share(共享)模式 .subscriptionType(SubscriptionType.Shared) // 配置从最早开始消费,否则可能会消费不到历史消息 .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) // 订阅 .subscribe();
说明:
- 参数 SUBSCRIPTIONNAME 需要写入订阅名,可在消费管理界面查看。
- 参数 TOPICNAME 需要填入完整路径,即 persistent://clusterid/namespace/Topic,clusterid/namespace/topic 的部分可以从控制台上 Topic管理 页面直接复制。
消费消息。
// 接收当前offset对应的一条消息 Message<byte[]> msg = consumer.receive(); MessageId msgId = msg.getMessageId(); String value = new String(msg.getValue()); System.out.println("receive msg " + msgId + ",value:" + value); // 接收到之后必须要ack,否则offset会一直停留在当前消息,导致消息积压 consumer.acknowledge(msg);
使用监听器进行消费。
// 消息监听器 MessageListener<byte[]> myMessageListener = (consumer, msg) -> { try { System.out.println("Message received: " + new String(msg.getData())); // 回复ack consumer.acknowledge(msg); } catch (Exception e) { // 消费失败,回复nack consumer.negativeAcknowledge(msg); } }; pulsarClient.newConsumer() // topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称,从【Topic管理】处复制 .topic("persistent://pulsar-mmqwr5xx9n7g/sdk_java/topic1") // 需要在控制台Topic详情页创建好一个订阅,此处填写订阅名 .subscriptionName("sub_topic1") // 声明消费模式为exclusive(独占)模式 .subscriptionType(SubscriptionType.Exclusive) // 设置监听器 .messageListener(myMessageListener) // 配置从最早开始消费,否则可能会消费不到历史消息 .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) .subscribe();
登录 TDMQ Pulsar 版控制台,依次点击 Topic 管理 > Topic 名称进入消费管理页面,点开订阅名下方右三角号,可查看生产消费记录。
说明:
上述是对消息的发布和订阅方式的简单介绍。更多操作可参见 Demo 或 Pulsar 官方文档。