消息重试与死信机制
最近更新时间: 2024-10-17 17:10:00
重试 Topic 是一种为了确保消息被正常消费而设计的 Topic 。当某些消息第一次被消费者消费后,没有得到正常的回应,则会进入重试 Topic 中,当重试达到一定次数后,停止重试,投递到死信 Topic 中。
当消息进入到死信队列中,表示 TDMQ Pulsar 版已经无法自动处理这批消息,一般这时就需要人为介入来处理这批消息。您可以通过编写专门的客户端来订阅死信 Topic,处理这批之前处理失败的消息。
自动重试
相关概念
重试 Topic:一个重试 Topic 对应一个订阅名(一个订阅者组的唯一标识),以 Topic 形式存在于 TDMQ Pulsar 版中。当您在控制台新建订阅,并打开自动创建重试&死信队列,系统会自动创建重试 Topic,该 Topic 会自主实现消息重试的机制。
该 Topic 命名为:
2.7.2 版本集群:[订阅名]-RETRY
2.6.1 版本集群:[订阅名]-retry
实现原理
您创建的消费者使用某个订阅名以共享模式订阅了一个 Topic 后,如果开启了 enableRetry
属性,就会自动订阅这个订阅名对应的重试队列。
说明
- 仅共享模式(包括 Key 共享)支持自动化重试和死信机制,独占和灾备模式不支持。
- 注意客户端版本需要与集群版本保持一致,客户端才能准确识别自动创建出的重试、死信队列。
这里以 Java 语言客户端为例,在 topic1
创建了一个 sub1
的订阅,客户端使用 sub1
订阅名订阅了 topic1
并开启了 enableRetry
,如下所示:
Consumer consumer = client.newConsumer()
.topic("persistent://1******30/my-ns/topic1")
.subscriptionType(SubscriptionType.Shared)//仅共享消费模式支持重试和死信
.enableRetry(true)
.subscriptionName("sub1")
.subscribe();
此时,topic1
对 sub1
的订阅就形成了带有重试机制的投递模式,sub1
会自动订阅之前在新建订阅时自动创建的重试 Topic 中(可以在控制台 Topic 列表中找到)。当 topic1
中的消息投递第一次未收到消费端 ACK 时,这条消息就会被自动投递到重试 Topic ,并且由于 consumer 自动订阅了这个主题,后续这条消息会在一定的 重试规则 下重新被消费。当达到最大重试次数后仍失败,消息会被投递到对应的死信队列,等待人工处理。
说明
如果是 client 端自动创建的订阅,可以通过控制台上的 Topic 管理 > 更多 > 查看订阅进入消费管理页面手动重建重试和死信队列。
自定义参数设置
TDMQ Pulsar 版会默认配置一套重试和死信参数,具体如下:
2.7.2 版本集群
指定重试次数为16次(失败16次后,第17次会投递到死信队列)
指定重试队列为
[订阅名]-RETRY
指定死信队列为
[订阅名]-DLQ
2.6.1 版本集群
指定重试次数为16次(失败16次后,第17次会投递到死信队列)
指定重试队列为
[订阅名]-retry
指定死信队列为
[订阅名]-dlq
如果希望自定义配置这些参数,可以使用 deadLetterPolicy
API 进行配置,代码如下:
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic("persistent://pulsar-****")
.subscriptionName("sub1")
.subscriptionType(SubscriptionType.Shared)
.enableRetry(true)//开启重试消费
.deadLetterPolicy(DeadLetterPolicy.builder()
.maxRedeliverCount(maxRedeliveryCount)//可以指定最大重试次数
.retryLetterTopic("persistent://my-property/my-ns/sub1-retry")//可以指定重试队列
.deadLetterTopic("persistent://my-property/my-ns/sub1-dlq")//可以指定死信队列
.build())
.subscribe();
重试规则
重试规则由 reconsumerLater
API 实现,有三种模式:
//指定任意延迟时间
consumer.reconsumeLater(msg, 1000L, TimeUnit.MILLISECONDS);
//指定延迟等级
consumer.reconsumeLater(msg, 1);
//等级递增
consumer.reconsumeLater(msg);
第一种:指定任意延迟时间。第二个参数填写延迟时间,第三个参数指定时间单位。延迟时间和延时消息的取值范围一致,范围在1 - 864000(单位:秒)。
第二种:指定任意延迟等级(仅限存量腾讯云版SDK的用户使用)。实现效果和第一种基本一致,更方便统一管理分布式系统中的延时时长,延迟等级说明如下:
reconsumeLater(msg, 1)
中的第二个参数即为消息等级。默认
MESSAGE_DELAYLEVEL = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
,这个常数决定了每级对应的延时时间,例如1级对应1s,3级对应10s。如果默认值不符合实际业务需求,用户可以重新自定义。
第三种:等级递增(仅限存量腾讯云版 SDK 的用户使用)。实现的效果不同于以上两种,为退避式的重试,即第一次失败后重试间隔为1秒,第二次失败后重试间隔为5秒,以此类推,次数越多,间隔时间越长。具体时间间隔同样由第二种中介绍的
MESSAGE_DELAYLEVEL
决定。 这种重试机制往往在业务场景中有更实际的应用,如果消费失败,一般服务不会立刻恢复,使用这种渐进式的重试方式更为合理。
注意
如果您使用的是 Pulsar 社区的 SDK,则不支持延迟等级和等级递增两种模式。
重试消息的消息属性
一条重试消息会给消息带上如下 property。
{
REAL_TOPIC="persistent://my-property/my-ns/test,
ORIGIN_MESSAGE_ID=314:28:-1,
RETRY_TOPIC="persistent://my-property/my-ns/my-subscription-retry,
RECONSUMETIMES=16
}
REAL_TOPIC
:原 Topic。ORIGIN_MESSAGE_ID
:最初生产的消息 ID。RETRY_TOPIC
:重试 Topic。RECONSUMETIMES
:代表该消息重试的次数。
重试消息的消息 ID 流转
消息 ID 流转过程如下所示,您可以借助此规则对相关日志进行分析。
原始消费: msgid=1:1:0:1
第一次重试: msgid=2:1:-1
第二次重试: msgid=2:2:-1
第三次重试: msgid=2:3:-1
.......
第16次重试: msgid=2:16:0:1
第17次写入死信队列: msgid=3:1:-1
完整代码示例
重试(-RETRY)Topic 需要在 Consumer 中首先开启该功能(enableRetry(true)),默认为关闭状态。之后需要调用 reconsumeLater() 的接口消息才会被发送到重试 Topic 中。
死信(-DLQ)Topic 需要调用 consumer.reconsumeLater(),执行 reconsumeLater 之后原 topic 的那条消息会被 ack,消息转存到 retry topic,重试到达上限后消息转存至死信。Pulsar Client 会自动订阅 retry topic,但是进入死信就不会自动订阅,需要用户自己来订阅。
以下为借助 TDMQ Pulsar 版实现完整消息重试机制的代码示例,供开发者参考。
订阅主题
Consumer<byte[]> consumer1 = client.newConsumer()
.topic("persistent://pulsar-****")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.enableRetry(true)//开启重试消费
//.deadLetterPolicy(DeadLetterPolicy.builder()
// .maxRedeliverCount(maxRedeliveryCount)
// .retryLetterTopic("persistent://my-property/my-ns/my-subscription-retry")//可以指定重试队列
// .deadLetterTopic("persistent://my-property/my-ns/my-subscription-dlq")//可以指定死信队列
// .build())
.subscribe();
执行消费
while (true) {
Message msg = consumer.receive();
try {
// Do something with the message
System.out.printf("Message received: %s", new String(msg.getData()));
// Acknowledge the message so that it can be deleted by the message broker
consumer.acknowledge(msg);
} catch (Exception e) {
// select reconsume policy
consumer.reconsumeLater(msg, 1000L, TimeUnit.MILLISECONDS);
//consumer.reconsumeLater(msg, 1);
//consumer.reconsumeLater(msg);
}
}
主动重试
当消费者在某个时间没有成功消费某条消息,如果想重新消费到这条消息时,消费者可以发送一条取消确认消息到 TDMQ Pulsar 版服务端,TDMQ Pulsar 版会将这条消息重新发给消费者。 这种方式重试时不会产生新的消息,所以也不能自定义重试间隔。
以下为主动重试的 Java 代码示例:
while (true) {
Message msg = consumer.receive();
try {
// Do something with the message
System.out.printf("Message received: %s", new String(msg.getData()));
// Acknowledge the message so that it can be deleted by the message broker
consumer.acknowledge(msg);
} catch (Exception e) {
// Message failed to process, redeliver later
consumer.negativeAcknowledge(msg);
}
}