Spring Boot Starter 接入
最近更新时间: 2025-10-11 18:10:00
操作场景
本文以 Spring Boot Starter 接入为例介绍实现消息收发的操作过程,帮助您更好地理解消息收发的完整过程。
前提条件
操作步骤
步骤1:添加依赖
在项目中引入 Pulsar Starter 相关依赖。
<dependency>
<groupId>io.github.majusko</groupId>
<artifactId>pulsar-java-spring-boot-starter</artifactId>
<version>1.0.7</version>
</dependency>
<!-- https://mvnrepository.com/artifact/io.projectreactor/reactor-core -->
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.4.11</version>
</dependency>
步骤2:准备配置
在配置文件中 添加 Pulsar 相关配置信息。
pulsar:
# 命名空间名称
namespace: namespace_java
# 服务接入地址
service-url: http://pulsar-xxx.tdmq.ap-gz.public.tencenttdmq.com:8080
# 授权角色密钥
token-auth-value: eyJrZXlJZC....
# 集群 ID
tenant: pulsar-xxx
| 参数 | 说明 |
| namespace | 命名空间名称,在控制台 命名空间 管理页面中复制。 |
| service-url | 集群接入地址,可以在控制台 集群管理 页面查看并复制。![]() |
| token-auth-value | 角色密钥,在 角色管理 页面复制密钥列复制。![]() |
| tenant | 集群 ID,在控制台 集群管理 页面中获取。 |
步骤3:生产消息
- 生产者工厂配置。
@Configuration
public class ProducerConfiguration {
@Bean
public ProducerFactory producerFactory() {
return new ProducerFactory()
// topic1 使用String类型生产者
.addProducer("topic1", String.class)
// topic2 使用byte[]类型(默认类型)生产者
.addProducer("topic2")
// topic3 使用MyMessage类型生产者 (自定义消息类型)
.addProducer("topic4", MyMessage.class);
}
}.
- 注入生产者。
@Autowired
private PulsarTemplate<byte[]> defaultProducer; // byte[]类型生产者
@Autowired
private PulsarTemplate<String> stringProducer; // String类型生产者
@Autowired
private PulsarTemplate<MyMessage> customProducer; // MyMessage类型生产者 (自定义消息类型)
- 发送消息。
// 发送String类型的消息
stringProducer.send("topic1", "Hello pulsar client.");
// 发送MyMessage类型消息 (自定义消息类型)
MyMessage myMessage = new MyMessage();
myMessage.setData("Hello client, this is a custom message.");
myMessage.setSendDate(new Date());
customProducer.send("topic4", myMessage);
// 发送byte[]类型消息
defaultProducer.send("topic2", ("Hello pulsar client, this is a order message" + i + ".").getBytes(StandardCharsets.UTF_8));
注意:
- 发送消息的 Topic 是在生产者配置中已经声明的 Topic。
- PulsarTemplate 类型应与发送消息的类型一致。
- 发送消息到指定 Topic 时,消息类型需要与生产者工厂配置中的 Topic 绑定的消息类型对应。
步骤4:消费消息
消费者配置。
@PulsarConsumer(topic = "topic1", // 订阅topic名称
subscriptionName = "sub_topic1", // 订阅名称
clazz = String.class, // 消息类型,需要与生产者保持一致,绑定后不能修改类型
serialization = Serialization.JSON, // 序列化方式
subscriptionType = SubscriptionType.Shared, // 订阅模式,默认为独占模式
consumerName = "firstTopicConsumer", // 消费者名称
maxRedeliverCount = 3, // 最大重试次数
deadLetterTopic = "sub_topic1-DLQ" // 死信topic名称
)
public void topicConsume(String msg) {
// TODO process your message
System.out.println("Received a new message. content: [" + msg + "]");
// 如果消费失败,请抛出异常,这样消息会进入重试队列,之后可以重新消费,直到达到最大重试次数之后,进入死信队列。前提是要创建重试和死信topic
}
步骤5:查询消息
登录控制台,进入 消息查询 页面,可查看 Demo 运行后的消息轨迹。
消息轨迹如下:

说明:
以上是基于 Springboot Starter 方式对 Pulsar 简单使用的配置。详细使用可参见 Demo 或 Starter 文档。

