rocketmq(二十)消息消费重试机制(代码片段)

爱是与世界平行 爱是与世界平行     2022-12-02     660

关键词:

1、顺序消息的消费重试

对于顺序消息,当Consumer消费消息失败后,为了保证消息的顺序性,其会自动不断地进行消息重试,直到消费成功。消费重试默认间隔时间为1000毫秒。重试期间应用会出现消息消费被阻塞的情况。

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg");
// 顺序消息消费失败的消费重试时间间隔,单位毫秒,默认为1000,其取值范围为[10,30000]
consumer.setSuspendCurrentQueueTimeMillis(100);

由于对顺序消息的重试是无休止的,不间断的,直至消费成功,所以,对于顺序消息的消费,务必要保证应用能够及时监控并处理消费失败的情况,避免消费被永久性阻塞。

注意,顺序消息没有发送失败重试机制,但具有消费失败重试机制。

2、无序消息的消费重试

对于无序消息(普通消息、延时消息、事务消息),当Consumer消费消息失败时,可以通过设置返回状态达到消息重试的效果。不过需要注意,无序消息的重试只对集群消费方式生效,广播消费方式不提供失败重试特性。即对于广播消费,消费失败后,失败消息不再重试,继续消费后续消息。

3、消费重试次数与间隔

对于无序消息集群消费下的重试消费,每条消息默认最多重试16次,但每次重试的间隔时间是不同的,会逐渐变长。每次重试的间隔时间如下表。

重试次数与上次重试的时间间隔重试次数与上次重试的时间间隔
110秒97分钟
230秒108分钟
31分钟119分钟
42分钟1210分钟
53分钟1320分钟
64分钟1430分钟
75分钟151小时
86分钟162小时

若一条消息在一直消费失败的前提下,将会在正常消费后的第 4小时46分 后进行第16次重试。

若仍然失败,则将消息投递到 死信队列

修改消费重试次数

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg");
// 修改消费重试次数
consumer.setMaxReconsumeTimes(10);

对于修改过的重试次数,将按照以下策略执行:

  • 若修改值小于16,则按照指定间隔进行重试
  • 若修改值大于16,则超过16次的重试时间间隔均为2小时

对于Consumer Group,若仅修改了一个Consumer的消费重试次数,则会应用到该Group中所有其它Consumer实例。若出现多个Consumer均做了修改的情况,则采用覆盖方式生效。即最后被修改的值会覆盖前面设置的值

4、重试队列

对于需要重试消费的消息,并不是Consumer在等待了指定时长后再次去拉取原来的消息进行消费,而是将这些需要重试消费的消息放入到了一个特殊Topic的队列中,而后进行再次消费的。这个特殊的队列就是重试队列。

当出现需要进行重试消费的消息时,Broker会为每个消费组都设置一个Topic名称为%RETRY%consumerGroup@consumerGroup 的重试队列。

1)这个重试队列是针对消费组的,而不是针对每个Topic设置的(一个Topic的消息可以让多个消费者组进行消费,所以会为这些消费者组各创建一个重试队列)

2)只有当出现需要进行重试消费的消息时,才会为该消费者组创建重试队列

注意,消费重试的时间间隔与延时消费 的延时等级 十分相似,除了没有延时等级的前两个时间外,其它的时间都是相同的

Broker对于重试消息的处理是通过延时消息实现的。先将消息保存到SCHEDULE_TOPIC_XXXX延迟队列中,延迟时间到后,会将消息投递到%RETRY%consumerGroup@consumerGroup重试队列中。

5、消费重试配置方式

集群消费方式下,消息消费失败后若希望消费重试,则需要在消息监听器接口的实现中明确进行如下三种方式之一的配置:

  • 方式1:返回ConsumeConcurrentlyStatus.RECONSUME_LATER(推荐)
  • 方式2:返回Null
  • 方式3:抛出异常

6、消费不重试配置方式

集群消费方式下,消息消费失败后若不希望消费重试,则在捕获到异常后同样也返回与消费成功后的相同的结果,即ConsumeConcurrentlyStatus.CONSUME_SUCCESS,则不进行消费重试。

rocketmq-消费重试机制

介绍:  RocketMQ的消息重试及时分为两种,一种是Producer端重试,一种是Consume端重试。  1、Producer端重试:    1.1消息发没发成功,默认情况下是3次重试。  2、Consumer端重试:    2.1exception的情况,一般重复16次1... 查看详情

rocketmq源码—八rocketmq消息重试(代码片段)

RocketMQ的消息重试包含了producer发送消息的重试和consumer消息消费的重试。producer发送消息重试producer在发送消息的时候如果发送失败了,RocketMQ会自动重试。privateSendResultsendDefaultImpl(Messagemsg,finalCommunicationModecommunicationMode,finalSendCal... 查看详情

rocketmq——消息重试(代码片段)

文章目录一、Producer端重试二、Consumer端重试1.Exception2.Timeout总结对于MQ,可能存在各种异常情况,导致消息无法最终被Consumer消费掉,因此就有了消息失败重试机制。很显示,消息重试分为2种:Producer端重试和Consumer端重试。一、... 查看详情

rocketmq(代码片段)

RocketMQ生产者和消费者  注:生产者在生产数据时,指定数据的key,然后消费者进行数据消费时,获取到key,与redis中保存的key做判断  如果不相同代表之前没有人进行消费,处理消费,保存到redis当中  当有第二个消费者... 查看详情

rocketmq是是如何管理消费进度的?又是如何保证消息成功消费的?(代码片段)

RocketMQ消费者保障消息确认机制consumer的每个实例是靠队列分配来决定如何消费消息的。那么消费进度具体是如何管理的,又是如何保证消息成功消费的?(RocketMQ有保证消息肯定消费成功的特性,失败则重试)什么是ACK消息确认机... 查看详情

rocketmq笔记:普通消息(代码片段)

  普通消息为RocketMQ中最基础的消息,支持生产者和消费者的异步解耦通信。一、普通消息的生命周期 1、初始化  消息被生产者构建并完成初始化,待发送到服务端的状态。2、待消费  消息被发送到服务端,对消费... 查看详情

源码分析rocketmq系列索引

1、RocketMQ源码分析之NameServer2、RocketMQ源码分析之Broker概述与同步消息发送原理与高可用设计及思考3、源码分析RocketMQ之CommitLog消息存储机制4、源码分析RocketMQ之消息消费5、源码分析RocketMQ消息消费机制----消费者拉取消息机制6、... 查看详情

rocketmq笔记:顺序消息(代码片段)

...是可以按照消息的发送顺序来消费(FIFO)。  顺序消息是RocketMQ提供的一种消息类型,支持消费者按照发送消息的先后顺序获取消息。顺序消息在发送、存储和投递的处理过程中,强调多条消息间的先后顺序关系。RocketMQ顺序消... 查看详情

rocketmq(二十二)高级功能汇总(代码片段)

1、消息存储分布式队列因为有高可靠性的要求,所以数据要进行持久化存储。消息生成者发送消息MQ收到消息,将消息进行持久化,在存储中新增一条记录返回ACK给生产者MQpush消息给对应的消费者,然后等待消费... 查看详情

rocketmq快速入门:消息发送延迟消息消费重试(代码片段)

...分享🐳。偶尔认知思考、日常水文🐌。目录1、RocketMQ消息结构1.1、消息结构1.2、三种消息发松方式2、快速搭建工程2.1、创建rocketmq-demo父工程2.2、生产者工程2.3、消费者工程2.4、消息发送过程3、消息发送过程4、三种消... 查看详情

rocketmq源码(24)—defaultmqpushconsumer延迟消息源码(代码片段)

基于RocketMQrelease-4.9.3,深入的介绍了DefaultMQPushConsumer延迟消息源码。文章目录1load加载延迟消息数据1.1parseDelayLevel解析延迟等级2start启动调度消息服务3DeliverDelayedMessageTimerTask投递延迟消息任务3.1executeOnTimeup执行延迟消息投递3... 查看详情

rocketmq的消息重试(消息重投)

详细介绍了RocketMQ的消息重试机制,RocketMQ的消息重试可以分为生产者重试和消费者重试两个部分。文章目录1生产者重试2消费者重试2.1异常重试2.2超时重试1生产者重试生产者在发送消息时,同步消息失败会重投,异... 查看详情

rocketmq源码—九rocketmq延时消息(代码片段)

...式,都是通过设置消息被投递的时间来实现的,但是ApacheRocketMQ在版本4.2.0中尚不支持指定时间的 查看详情

rocketmq源码—九rocketmq延时消息(代码片段)

...式,都是通过设置消息被投递的时间来实现的,但是ApacheRocketMQ在版本4.2.0中尚不支持指定时间的 查看详情

rocketmq的消息重试

...术A对于顺序消息,当消费者消费消息失败后,消息队列RocketMQ会自动不断进行消息重试(每次间隔时间为1秒),这时,应用会出现消息消费被阻塞的情况。因此,在使用顺序消息时,务必保证应用能够及时监控并处理消费失败... 查看详情

rocketmq:消息ack机制源码解析(代码片段)

消息消费进度概述消费者消费消息过程中,为了避免消息的重复消费,应将消息消费进度保存起来,当其他消费者再对消息进行消费时,读取已消费的消息偏移量,对之后的消息进行消费即可。消息模式分为... 查看详情

rocketmq使用(代码片段)

rocketmq 基本使用可以看官网和官网给的demo.https://github.com/apache/rocketmq/tree/master/example这里主要说明几个点:rocketmq  发送类型常用:1,普通消息.(可以获取发送结果,失败了重试)2,有序消息.(秒杀,等需要有序的消费场景)3,事... 查看详情

深入剖析rocketmq源码-负载均衡机制(代码片段)

目录一、引言二、RocketMQ的整体架构三、producer消息生产过程3.1路由同步过程3.2 负载均衡过程四、consumer消息消费过程4.1路由同步过程4.2 负载均衡过程五、RocketMQ指定机器消费设计思路六、小结一、引言RocketMQ是一款优秀的分布... 查看详情