下载并运行Demo

最近更新时间: 2024-06-12 15:06:00

操作场景

该任务指导您在购买TDMQ服务后,下载 Demo 并进行简单的测试,了解运行一个客户端的操作步骤。

前提条件

已 购买云服务器。

操作步骤

  1. 下载 Demo ,并配置相关参数。

    添加 Maven 依赖 按照 [Pulsar 官方文档] 添加 Maven 依赖。

    <!-- in your <properties> block -->
    <pulsar.version>2.7.1</pulsar.version>
    <!-- in your <dependencies> block -->
    <dependency>
        <groupId>org.apache.pulsar</groupId>
        <artifactId>pulsar-client</artifactId>
        <version>${pulsar.version}</version>
    </dependency>

创建 Client

// 一个Pulsar client对应一个客户端链接

// 原则上一个进程一个client,尽量避免重复创建,消耗资源

PulsarClient client = PulsarClient.builder()
        //替换成集群接入地址,位于【集群管理】页面接入地址
        .serviceUrl("http://****")
        //替换成角色密钥,位于【角色管理】页面
        .authentication(AuthenticationFactory.token("eyJr****"))
        .build(); 

System.out.println(">> pulsar client created.");
  • serviceUrl 即接入地址,可以在控制台【集群管理】接入点页面查看并复制。

  • token 即角色的密钥,角色密钥可以在【角色管理】中复制。

    注意:

    密钥泄露很可能导致您的数据泄露,请妥善保管您的密钥。

创建消费者进程

Consumer<byte[]> consumer = client.newConsumer()
                .topic("persistent://pulsar-****")//topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称
                .subscriptionName("****")//需要现在控制台或者通过控制台API创建好一个订阅,此处填写订阅名
                .subscriptionType(SubscriptionType.Exclusive)//声明消费模式为exclusive(独占)模式
                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)//配置从最早开始消费,否则可能会消费不到历史消息
                .subscribe();
        System.out.println(">> pulsar consumer created.");
  • Topic 名称需要填入完整路径,即“persistent://clusterid/namespace/Topic”,clusterid/namespace/topic的部分可以从控制台上【Topic管理】页面直接复制。

  • subscriptionName需要写入订阅名,可在【消费管理】界面查看。

创建生产者进程

Producer<byte[]> producer = client.newProducer()
                .topic("persistent://pulsar-****")//topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称
                .create();
        System.out.println(">> pulsar producer created.");

Topic 名称需要填入完整路径,即“persistent://clusterid/namespace/Topic”,clusterid/namespace/topic 的部分可以从控制台上【Topic管理】页面直接复制。

生产消息

for (int i = 0; i < 1000; i++) {
            String value = "my-sync-message-" + i;
            MessageId msgId = producer.newMessage().value(value.getBytes()).send();//发送消息
            System.out.println("deliver msg " + msgId + ",value:" + value);
        }
        producer.close();//关闭生产进程

消费消息

for (int i = 0; i < 1000; i++) {
            Message<byte[]> msg = consumer.receive();//接收当前offset对应的一条消息
            String msgId = msg.getMessageId().toString();
            String value = new String(msg.getValue());
            System.out.println("receive msg " + msgId + ",value:" + value);
            consumer.acknowledge(msg);//接收到之后必须要ack,否则offset会一直停留在当前消息,无法继续消费
        }
  1. pom.xml 所在目录执行命令mvn clean package,或者通过IDE自带的功能打包整个工程,在target目录下生成一个可运行的jar文件。

  2. 运行成功后将 jar 文件上传到云服务器。

  3. 登录云服务器,进入到刚刚上传jar文件所在的目录,可看到文件已上传到云服务器。

  4. 执行命令 java -jar tdmq-demo-1.0.0.jar,运行 Demo,可查看运行日志。

  5. 登录 TDMQ 控制台,依次点击【Topic管理】>【Topic名称】进入消费管理页面,点开订阅名下方右三角号,可查看生产消费记录。

  6. 进入【消息查询】页面,可查看 Demo 运行后的消息轨迹。 消息轨迹如下: