消息重试和死信机制

最近更新时间: 2024-10-17 17:10:00

重试 Topic 是一种为了确保消息被正常消费而设计的 Topic 。当某些消息第一次被消费者消费后,没有得到正常的回应,则会进入重试 Topic 中,当重试达到一定次数后,停止重试,投递到死信 Topic 中。

当消息进入到死信队列中,表示 TDMQ 已经无法自动处理这批消息,一般这时就需要人为介入来处理这批消息。您可以通过编写专门的客户端来订阅死信 Topic,处理这批之前处理失败的消息。

自动重试

相关概念

重试 Topic:一个重试 Topic 是对应一个订阅名(一个订阅者组的唯一标识)的,以 Topic 形式存在于 TDMQ 中。当您新建了一个订阅后,会自动创建一个 “[订阅名]-retry” 的 Topic,该 Topic 会自主实现消息重试的机制。

实现原理

您创建的消费者使用某个订阅名以共享模式订阅了一个 Topic 后,如果开启了 enableRetry 属性,就会自动订阅这个订阅名对应的重试队列。

说明:

仅共享模式支持自动化重试和死信机制,独占和灾备模式不支持。

这里以 Java 语言客户端为例,在 topic1 创建了一个 sub1 的订阅,客户端使用 sub1 订阅名订阅了 topic1 并开启了 enableRetry,如下所示:

Consumer consumer = client.newConsumer()
    .topic("persistent://1******30/default/topic1")
    .subscriptionType(SubscriptionType.Shared)//仅共享消费模式支持重试和死信
    .enableRetry(true)
    .subscriptionName("sub1")
    .subscribe();

此时,topic1sub1 的订阅就形成了带有重试机制的投递模式,sub1 会自动订阅之前在新建订阅时自动创建的 sub1-retry(可以在控制台 Topic 列表中找到)。当 topic1 中的消息投递第一次未收到消费端 ACK 时,这条消息就会被自动投递到重试 Topic sub1-retry,并且由于 consumer 自动订阅了这个主题,后续这条消息会在一定的 重试规则下重新被消费。当达到最大重试次数后仍失败,消息会被投递到对应的死信队列 sub1-dlq,等待人工处理。

自定义参数设置

TDMQ 会默认配置一套重试和死信参数,具体如下:

  • 指定重试次数为16次(失败16次后,第17次会投递到死信队列)

  • 指定重试队列为[订阅名]-retry

  • 指定死信队列为[订阅名]-dlq

如果希望自定义配置这些参数,可以使用 deadLetterPolicy API 进行配置,代码如下:

Consumer<byte[]> consumer = pulsarClient.newConsumer()
    .topic(topic)
    .subscriptionName("sub1")
    .subscriptionType(SubscriptionType.Shared)
    .enableRetry(true)//开启重试消费
    .deadLetterPolicy(DeadLetterPolicy.builder()
          .maxRedeliverCount(maxRedeliveryCount)//可以指定最大重试次数
          .retryLetterTopic("persistent://my-property/my-ns/sub1-retry")//可以指定重试队列
          .deadLetterTopic("persistent://my-property/my-ns/sub1-dlq")//可以指定死信队列
          .build())
    .subscribe();

重试规则

重试规则由 reconsumerLater API 实现,有三种模式:

 //指定任意延迟时间
 consumer.reconsumeLater(msg, 1000L, TimeUnit.MILLISECONDS);
 //指定延迟等级
 consumer.reconsumeLater(msg, 1);
 //等级递增
 consumer.reconsumeLater(msg);
  • 第一种:指定任意延迟时间。第二个参数填写延迟时间,第三个参数指定时间单位。延迟时间和延时消息的取值范围一致,范围在1 - 864000(单位:秒)。

  • 第二种:指定任意延迟等级。实现效果和第一种基本一致,更方便统一管理分布式系统中的延时时长,延迟等级说明如下:

    1. reconsumeLater(msg, 1)中的第二个参数即为消息等级

    2. 默认MESSAGE_DELAYLEVEL = &quot;1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h&quot;,这个常数决定了每级对应的延时时间,例如1级对应1s,3级对应10s。如果默认值不符合实际业务需求,用户可以重新自定义。

  • 第三种:等级递增。实现的效果不同于以上两种,为退避式的重试,即第一次失败后重试间隔为1秒,第二次失败后重试间隔为5秒,以此类推,次数越多,间隔时间越长。具体时间间隔同样由第二种中介绍的 MESSAGE_DELAYLEVEL 决定。 这种重试机制往往在业务场景中有更实际的应用,如果消费失败,一般服务不会立刻恢复,使用这种渐进式的重试方式更为合理。

重试消息的消息属性

一条重试消息会给消息带上如下 property。

{
  REAL_TOPIC="persistent://my-property/my-ns/test, 
  ORIGIN_MESSAGE_ID=314:28:-1, 
  RETRY_TOPIC=persistent://my-property/my-ns/my-subscription-retry, 
  RECONSUMETIMES=16
}
  • REAL_TOPIC:原 Topic

  • ORIGIN_MESSAGE_ID:最初生产的消息 ID

  • RETRY_TOPIC:重试 Topic

  • RECONSUMETIMES:代表该消息重试的次数

重试消息的消息 ID 流转

消息 ID 流转过程如下所示,您可以借助此规则对相关日志进行分析。

原始消费: msgid=1:1:0:1
第一次重试: msgid=2:1:-1
第二次重试: msgid=2:2:-1
第三次重试: msgid=2:3:-1
.......
第16次重试: msgid=2:16:0:1
第17次写入死信队列: msgid=3:1:-1

完整代码示例

以下为借助 TDMQ 实现完整消息重试机制的代码示例,供开发者参考。

订阅主题

Consumer<byte[]> consumer1 = client.newConsumer()
        .topic("topic")
        .subscriptionName("my-subscription")
        .subscriptionType(SubscriptionType.Shared)
        .enableRetry(true)//开启重试消费
        //.deadLetterPolicy(DeadLetterPolicy.builder()
        //         .maxRedeliverCount(maxRedeliveryCount)
        //         .retryLetterTopic("persistent://my-property/my-ns/my-subscription-retry")//可以指定重试队列
        //         .deadLetterTopic("persistent://my-property/my-ns/my-subscription-dlq")//可以指定死信队列
        //         .build())
        .subscribe();

执行消费

while (true) {
      Message msg = consumer.receive();
      try {
            // Do something with the message
            System.out.printf("Message received: %s", new String(msg.getData()));
            // Acknowledge the message so that it can be deleted by the message broker
            consumer.acknowledge(msg);
      } catch (Exception e) {
            // select reconsume policy
            consumer.reconsumeLater(msg, 1000L, TimeUnit.MILLISECONDS);
            //consumer.reconsumeLater(msg, 1);
            //consumer.reconsumeLater(msg);
      }
}

主动重试

当消费者在某个时间没有成功消费某条消息,如果想重新消费到这条消息时,消费者可以发送一条取消确认消息到 TDMQ 服务端,TDMQ 会将这条消息重新发给消费者。 这种方式重试时不会产生新的消息,所以也不能自定义重试间隔。

以下为主动重试的 Java 代码示例:

while (true) {
    Message msg = consumer.receive();
    try {
        // Do something with the message
        System.out.printf("Message received: %s", new String(msg.getData()));
        // Acknowledge the message so that it can be deleted by the message broker
        consumer.acknowledge(msg);
    } catch (Exception e) {
        // Message failed to process, redeliver later
        consumer.negativeAcknowledge(msg);
    }
    consumer.negativeAcknowledge(msg);
}