下载并运行Demo
最近更新时间: 2024-10-17 17:10:00
操作场景
该任务指导您在购买TDMQ服务后,下载 Demo 并进行简单的测试,了解运行一个客户端的操作步骤。
前提条件
已 购买云服务器。
操作步骤
- 下载 Demo ,并配置相关参数。
添加 Maven 依赖 按照 [Pulsar 官方文档] 添加 Maven 依赖。
<!-- in your <properties> block -->
<pulsar.version>2.7.1</pulsar.version>
<!-- in your <dependencies> block -->
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<version>${pulsar.version}</version>
</dependency>
创建 Client
// 一个Pulsar client对应一个客户端链接
// 原则上一个进程一个client,尽量避免重复创建,消耗资源
PulsarClient client = PulsarClient.builder()
//替换成集群接入地址,位于【集群管理】页面接入地址
.serviceUrl("http://****")
//替换成角色密钥,位于【角色管理】页面
.authentication(AuthenticationFactory.token("eyJr****"))
.build();
System.out.println(">> pulsar client created.");
serviceUrl 即接入地址,可以在控制台【集群管理】接入点页面查看并复制。
token 即角色的密钥,角色密钥可以在【角色管理】中复制。
注意:
密钥泄露很可能导致您的数据泄露,请妥善保管您的密钥。
创建消费者进程
Consumer<byte[]> consumer = client.newConsumer()
.topic("persistent://pulsar-****")//topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称
.subscriptionName("****")//需要现在控制台或者通过控制台API创建好一个订阅,此处填写订阅名
.subscriptionType(SubscriptionType.Exclusive)//声明消费模式为exclusive(独占)模式
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)//配置从最早开始消费,否则可能会消费不到历史消息
.subscribe();
System.out.println(">> pulsar consumer created.");
- Topic 名称需要填入完整路径,即“
persistent://clusterid/namespace/Topic
”,clusterid/namespace/topic
的部分可以从控制台上【Topic管理】页面直接复制。
- subscriptionName需要写入订阅名,可在【消费管理】界面查看。
创建生产者进程
Producer<byte[]> producer = client.newProducer()
.topic("persistent://pulsar-****")//topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称
.create();
System.out.println(">> pulsar producer created.");
Topic 名称需要填入完整路径,即“persistent://clusterid/namespace/Topic
”,clusterid/namespace/topic 的部分可以从控制台上【Topic管理】页面直接复制。
生产消息
for (int i = 0; i < 1000; i++) {
String value = "my-sync-message-" + i;
MessageId msgId = producer.newMessage().value(value.getBytes()).send();//发送消息
System.out.println("deliver msg " + msgId + ",value:" + value);
}
producer.close();//关闭生产进程
消费消息
for (int i = 0; i < 1000; i++) {
Message<byte[]> msg = consumer.receive();//接收当前offset对应的一条消息
String msgId = msg.getMessageId().toString();
String value = new String(msg.getValue());
System.out.println("receive msg " + msgId + ",value:" + value);
consumer.acknowledge(msg);//接收到之后必须要ack,否则offset会一直停留在当前消息,无法继续消费
}
在
pom.xml
所在目录执行命令mvn clean package
,或者通过IDE自带的功能打包整个工程,在target目录下生成一个可运行的jar文件。运行成功后将 jar 文件上传到云服务器。
登录云服务器,进入到刚刚上传jar文件所在的目录,可看到文件已上传到云服务器。
- 执行命令 java -jar tdmq-demo-1.0.0.jar,运行 Demo,可查看运行日志。
- 登录 TDMQ 控制台,依次点击【Topic管理】>【Topic名称】进入消费管理页面,点开订阅名下方右三角号,可查看生产消费记录。
- 进入【消息查询】页面,可查看 Demo 运行后的消息轨迹。 消息轨迹如下: