rocketmq的消息重试(消息重投)

刘Java 刘Java     2023-01-27     204

关键词:

详细介绍了RocketMQ的消息重试机制,RocketMQ的消息重试可以分为生产者重试和消费者重试两个部分。

1 生产者重试

生产者在发送消息时,同步消息失败会重投,异步消息有重试,oneway没有任何保证。消息重投保证消息尽可能发送成功、不丢失,但可能会造成消息重复,消息重复在RocketMQ中是无法避免的问题。消息重复在一般情况下不会发生,当出现消息量大、网络抖动,消息重复就会是大概率事件。另外,生产者主动重发、consumer负载变化也会导致重复消息。

如下生产者属性可以设置消息重试策略:

  1. retryTimesWhenSendFailed:同步发送失败重投次数,默认为2,因此生产者会最多尝试发送retryTimesWhenSendFailed + 1次。不会选择上次失败的broker,尝试向其他broker发送,最大程度保证消息不丢。超过重投次数,抛出异常,由客户端保证消息不丢。当出现RemotingException、MQClientException和部分MQBrokerException时会重投。如果因为超时,那么便不再重试。
  2. retryTimesWhenSendAsyncFailed:异步发送失败重试次数,默认为2,异步重试不会选择其他broker,仅在同一个broker上做重试,不保证消息不丢。如果因为超时,那么便不再重试。
  3. retryAnotherBrokerWhenNotStoreOK:消息刷盘(主或备)超时或slave不可用(返回状态非SEND_OK),是否尝试发送到其他broker,默认false。十分重要消息可以开启。

两个注意点:

  1. 如果同步模式发送失败,则轮转到下一个Broker,如果异步模式发送失败,则只会在当前Broker进行重试。
  2. 发送消息超时时间默认3000毫秒,如果因为超时,那么便不再尝试重试。

2 消费者重试

Consumer消费消息失败后,要提供一种重试机制,令消息至少再消费一次。通常引起消息消费重试的时候包括两种情况:异常重试和超时重试。另外,Consumer在广播模式下重试失效。

2.1 异常重试

由于Consumer端逻辑出现了异常,导致没有返回SUCCESS状态,那么Broker就会在一段时间后尝试重试。

RocketMQ会为每个消费组都设置一个Topic名称为“%RETRY%+consumerGroup”的重试队列(这里需要注意的是,这个Topic的重试队列是针对消费组,而不是针对每个Topic设置的),用于暂时保存因为各种异常而导致Consumer端无法消费的消息,每个Consumer实例在启动的时候就默认订阅了该消费组的重试队列Topic。

考虑到异常恢复起来需要一些时间,会为重试队列设置多个重试级别,每个重试级别都有与之对应的重新投递延时,重试次数越多投递延时就越大(实际上就是配置的延时队列的级别level)。RocketMQ对于重试消息的处理是先保存至Topic名称为“SCHEDULE_TOPIC_XXXX”的延迟队列中,后台定时任务按照对应的时间进行Delay后重新保存至“%RETRY%+consumerGroup”的重试队列中。

SubscriptionGroupConfig的配置中,默认最大重试16次(retryMaxTimes属性,默认重level3开始),但如果Consumer端逻辑出现异常,重试太多次也没有很大的意义,因此我们可以在代码中指定最大的重试次数,达到一定次数之后就返回SUCCESS,不再重试,对于失败的消息记录到数据库的表中,后续人工处理。

以上是针对并行消费的重试策略,即MessageListener为MessageListenerConcurrently。如果是串行消费策略,即MessageListener为MessageListenerOrderly,那么将会暂停后续消费进度,对于该消息无限的进行重试投递,这是需要特别注意的。

可通过consumer 客户端参数MaxReconsumeTimes设置最大重试次数,超过最大重试次数,消息将被转移到死信队列,范围是**-1 – 16之间。对于有序消费模式MessageListenerOrderly**,默认**-1**,表示无限次本地立即重试消费;对于并发无序消费模式MessageListenerConcurrently,默认16次延时消费,从Level3开始。

2.2 超时重试

如果Consumer端处理时间过长,或者由于某些原因线程挂起,导致迟迟没有返回任何消费状态(成功或者失败),Broker就会认为Consumer消费超时,此时会发起超时重试,默认的消费超时时间为15分钟,时间足够的长了。

RocketMQ会认为该消息没有发送到Consumer端,会一直不停的发送,即无限重试。

相关文章:

RocketMQ

如有需要交流,或者文章有误,请直接留言。另外希望点赞、收藏、关注,我将不间断更新各种Java学习博客!

rocketmq源码—八rocketmq消息重试(代码片段)

RocketMQ的消息重试包含了producer发送消息的重试和consumer消息消费的重试。producer发送消息重试producer在发送消息的时候如果发送失败了,RocketMQ会自动重试。privateSendResultsendDefaultImpl(Messagemsg,finalCommunicationModecommunicationMode,finalSendCal... 查看详情

rocketmq的消息重试

...术A对于顺序消息,当消费者消费消息失败后,消息队列RocketMQ会自动不断进行消息重试(每次间隔时间为1秒),这时,应用会出现消息消费被阻塞的情况。因此,在使用顺序消息时,务必保证应用能够及时监控并处理消费失败... 查看详情

rocketmq(二十)消息消费重试机制(代码片段)

1、顺序消息的消费重试对于顺序消息,当Consumer消费消息失败后,为了保证消息的顺序性,其会自动不断地进行消息重试,直到消费成功。消费重试默认间隔时间为1000毫秒。重试期间应用会出现消息消费被阻塞的... 查看详情

rocketmq——消息重试(代码片段)

文章目录一、Producer端重试二、Consumer端重试1.Exception2.Timeout总结对于MQ,可能存在各种异常情况,导致消息无法最终被Consumer消费掉,因此就有了消息失败重试机制。很显示,消息重试分为2种:Producer端重试和Consumer端重试。一、... 查看详情

rocketmq:死信队列和消息幂等

参考技术A上一篇《RocketMQ:消息重试》中我们提到当一条消息消费失败时,RocketMQ会进行一定次数的重试。重试的结果也很简单,无非就是在第N次重试时,被成功消费。或者就是经过M次重试后,仍然没有被消息。这通常是由于... 查看详情

rocketmq快速入门:消息发送延迟消息消费重试(代码片段)

...分享🐳。偶尔认知思考、日常水文🐌。目录1、RocketMQ消息结构1.1、消息结构1.2、三种消息发松方式2、快速搭建工程2.1、创建rocketmq-demo父工程2.2、生产者工程2.3、消费者工程2.4、消息发送过程3、消息发送过程4、三种消... 查看详情

rocketmq笔记:顺序消息(代码片段)

...是可以按照消息的发送顺序来消费(FIFO)。  顺序消息是RocketMQ提供的一种消息类型,支持消费者按照发送消息的先后顺序获取消息。顺序消息在发送、存储和投递的处理过程中,强调多条消息间的先后顺序关系。RocketMQ顺序消... 查看详情

rocketmq源码—九rocketmq延时消息(代码片段)

...式,都是通过设置消息被投递的时间来实现的,但是ApacheRocketMQ在版本4.2.0中尚不支持指定时间的 查看详情

rocketmq源码—九rocketmq延时消息(代码片段)

...式,都是通过设置消息被投递的时间来实现的,但是ApacheRocketMQ在版本4.2.0中尚不支持指定时间的 查看详情

rocketmq源码(24)—defaultmqpushconsumer延迟消息源码(代码片段)

基于RocketMQrelease-4.9.3,深入的介绍了DefaultMQPushConsumer延迟消息源码。文章目录1load加载延迟消息数据1.1parseDelayLevel解析延迟等级2start启动调度消息服务3DeliverDelayedMessageTimerTask投递延迟消息任务3.1executeOnTimeup执行延迟消息投递3... 查看详情

rocketmq消息发送

 消息发送基本流程:  1、消息验证    验证主题(topic),消息体不能为空和大小不能超过4M。  2、路由查找    a、查看缓存,是否有topic的路由信息。    b、如果没有则到NameServer中获取路由信息,如果... 查看详情

rocketmq和kafka到底选哪个

参考技术A1、适用场景kafka适合日志处理rocketmq适合业务处理结论:两者没有区别,根据具体业务定夺2、性能kafka单机写入TPS号称在百万条/秒rocketmq大约在10万条/秒结论:追求性能方面,kafka单机性能更高3、可靠性kafka使用异步刷... 查看详情

rocketmq-消费重试机制

介绍:  RocketMQ的消息重试及时分为两种,一种是Producer端重试,一种是Consume端重试。  1、Producer端重试:    1.1消息发没发成功,默认情况下是3次重试。  2、Consumer端重试:    2.1exception的情况,一般重复16次1... 查看详情

rocketmq(代码片段)

RocketMQ生产者和消费者  注:生产者在生产数据时,指定数据的key,然后消费者进行数据消费时,获取到key,与redis中保存的key做判断  如果不相同代表之前没有人进行消费,处理消费,保存到redis当中  当有第二个消费者... 查看详情

rocketmq保证消息不丢失

...cer消息存储在内存中,然后发生宕机,就会导致消息丢失RocketMQ的持久化消息有两种方式:同步刷盘:Broker收到消息后会在持久化到磁盘完成后才发送ack异步刷盘:Broker收到消息存到内存后返回ack,然后Broker定 查看详情

rocketmq重试队列与死信队列简介

1、消费者异常了怎么办?假设我们使用RocketMQ作为消息中间件,传输订单相关的数据,消费者拿到数据后,执行一些后续处理,比如调用物流系统,准备发货。如果这时候,物流系统的数据库宕机了... 查看详情

rocketmq的死信队列

...丢弃,而是将其发送到该消费者对应的特殊队列中。RocketMQ将这种正常情况下无法被消费的消息称为死信消息(Dead-LetterMessage),将存储死信消息的特殊队列称为死信队列(Dead-LetterQueue),简称DLQ。如... 查看详情

rocketmq的死信队列

...丢弃,而是将其发送到该消费者对应的特殊队列中。RocketMQ将这种正常情况下无法被消费的消息称为死信消息(Dead-LetterMessage),将存储死信消息的特殊队列称为死信队列(Dead-LetterQueue),简称DLQ。如... 查看详情