rocketmq的顺序消息(顺序消费)(代码片段)

刘Java 刘Java     2023-01-23     601

关键词:

简单介绍了消息有序性的概念,以及RocketMQ如何实现消息的顺序消费。

1 消息的有序性

消息有序指的是一类消息消费时,能按照发送的顺序来消费。例如:一个订单产生了三条消息分别是订单创建、订单付款、订单完成。消费时要按照这个顺序消费才能有意义,但是同时订单之间是可以并行消费的。RocketMQ可以严格的保证消息有序。

顺序消息分为全局顺序消息与部分顺序消息,全局顺序是指某个Topic下的所有消息都要保证顺序;部分顺序消息只要保证每一组消息被顺序消费即可。

如果想要实现全局顺序消息,那么只能使用一个队列,以及单个生产者,这是会严重影响性能。

因此,我们常说的顺序消息通常是只的部分顺序消息,就上面的例子来说,我们不用管不同的订单ID的消息之间的总体消费顺序,只需要保证同样订单ID的消息能按照订单创建、订单付款、订单完成这个顺序消费就可以了。

顺序消费实际上有两个核心点,一个是生产者有序存储,另一个是消费者有序消费。

2 生产者有序发送

先看如何实现生产者有序存储。我们知道RocketMQ中生产者生产的消息会放置在某个队列中,基于队列先进先出的特性天然的可以保证存入队列的消息顺序和拉取的消息顺序是一致的,因此,我们只需要保证一组相同的消息按照给定的顺序存入同一个队列中,就能保证生产者有序存储。

普通发送消息的模式下,生产者会采用轮询的方式将消费均匀的分发到不同的队列中,然后被不同的消费者消费,因为一组消息在不同的队列,此时就无法使用 RocketMQ 带来的队列有序特性来保证消息有序性了。

这个问题很好解决,因为RocketMQ支持生产者在投放消息的时候自定义投放策略,我们实现一个MessageQueueSelector接口,使用Hash取模法来保证同一个订单在同一个队列中就行了,即通过订单ID%队列数量得到该ID的订单所投放的队列在队列列表中的索引,然后该订单的所有消息都会被投放到这个队列中。

生产者发送消息的方法中就有一些添加队列选择器的方法,保证消息发送顺序。

比如只有两个队列,那么订单ID为1,2,3的三组消息中,1、3组消息存放于第一个队列,而2组消息存放于第二个队列,如下图是一种消息可能的消息存放顺序:

根据上图可以,上面的方法可以实现一组消息被顺序的存放,不同组的消息之间的顺序无法保证,这就是部分顺序。

另外,顺序消息必须使用同步发送的方式才能保证生产者发送的消息有序。

实际上,采用队列选择器的方法不能保证消息的严格顺序,我们的目的是将消息发送到同一个队列中,如果某个broker挂了,那么队列就会减少一部分,如果采用取余的方式投递,将可能导致同一个业务中的不同消息被发送到不同的队列中,导致同一个业务的不同消息被存入不同的队列中,短暂的造成部分消息无序。同样的,如果增加了服务器,那么也会造成短暂的造成部分消息无序。

3 消费者有序消费

生产者有序存储实现了,那么该如何实现消费者有序消费呢?RockerMQ的MessageListener回调函数提供了两种消费模式,有序消费模式MessageListenerOrderly和并发消费模式MessageListenerConcurrently。

在消费的时候,还需要保证消费者注册MessageListenerOrderly类型的回调接口实现顺序消费,如果消费者采用Concurrently并行消费,则仍然不能保证消息消费顺序。

实际上,每一个消费者的的消费端都是采用线程池实现多线程消费的模式,即消费端是多线程消费。虽然MessageListenerOrderly被称为有序消费模式,但是仍然是使用的线程池去消费消息。

MessageListenerConcurrently是拉取到新消息之后就提交到线程池去消费,而MessageListenerOrderly则是通过加分布式锁和本地锁保证同时只有一条线程去消费一个队列上的数据。

MessageListenerOrderly的加锁机制:

  1. 消费者在进行某个队列的消息拉取时首先向Broker服务器申请队列锁,如果申请到琐,则拉取消息,否则放弃消息拉取,等到下一个队列负载周期(20s)再试。这一个锁使得一个MessageQueue同一个时刻只能被一个消费客户端消费,防止因为队列负载均衡导致消息重复消费。
  2. 假设消费者对messageQueue的加锁已经成功,那么会开始拉取消息,拉取到消息后同样会提交到消费端的线程池进行消费。但在本地消费之前,会先获取该messageQueue对应的锁对象,每一个messageQueue对应一个锁对象,获取到锁对象后,使用synchronized阻塞式的申请线程级独占锁。这一个锁使得来自同一个messageQueue的消息在本地的同一个时刻只能被一个消费客户端中的一个线程顺序的消费。
  3. 在本地加synchronized锁成功之后,还会判断如果是广播模式,则直接进行消费,如果是集群模式,则判断如果messagequeue没有锁住或者锁过期(默认30000ms),那么延迟100ms后再次尝试向Broker 申请锁定messageQueue,锁定成功后重新提交消费请求。

目前来说,消费者使用MessageListenerOrderly顺序消费有个两个问题:

  1. 使用了很多的锁,降低了吞吐量。
  2. 前一个消息消费阻塞时后面消息都会被阻塞。如果遇到消费失败的消息,会自动对当前消息进行重试(每次间隔时间为1秒),无法自动跳过,重试最大次数是Integer.MAX_VALUE,这将导致当前队列消费暂停,因此通常需要设定有一个最大消费次数,以及处理好所有可能的异常情况。

相关文章:

RocketMQ

如有需要交流,或者文章有误,请直接留言。另外希望点赞、收藏、关注,我将不间断更新各种Java学习博客!

rocketmq之顺序消费(代码片段)

消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。顺序消费的原理解析,在默认的情况下消息发送会采取RoundRobin轮询方式把消息发送到不同的queue(... 查看详情

rocketmq之顺序消费(代码片段)

消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。顺序消费的原理解析,在默认的情况下消息发送会采取RoundRobin轮询方式把消息发送到不同的queue(... 查看详情

rocketmq顺序消费(代码片段)

部分内容出处  https://www.jianshu.com/p/453c6e7ff81crocketmq内部有4个默认的队里,在发送消息时,同一组的消息需要按照顺序,发送到相应的mq中,同一组消息按照顺序进行消费,不同组的消息可以并行的进行消费。下面看一下pr... 查看详情

rocketmq(十四)顺序消息(代码片段)

1、什么是顺序消息顺序消息指的是,严格按照消息的发送顺序进行消费的消息(FIFO)。默认情况下生产者会把消息以RoundRobin轮询方式发送到不同的Queue分区队列;而消费消息时会从多个Queue上拉取消息,这种情况下的发... 查看详情

rocketmq(04)——发送顺序消息(代码片段)

发送顺序消息如果你的业务上对消息的发送和消费顺序有较高的需求,那么在发送消息的时候你需要把它们放到同一个消息队列中,因为只有同一个队列的消息才能确保消费的顺序性。下面代码我们在发送消息的时候,调用的是... 查看详情

rocketmq源码(22)—consumemessageorderlyservice顺序消费消息源码(代码片段)

基于RocketMQrelease-4.9.3,深入的介绍了ConsumeMessageOrderlyService顺序消费消息源码。此前我们学习了consumer消息的拉取流程源码:RocketMQ源码(18)—DefaultMQPushConsumer消费者发起拉取消息请求源码RocketMQ源码(19)—Broker处理DefaultMQPushC... 查看详情

rocketmq-消息消费模式顺序消费(代码片段)

RocketMQ-消息消费模式顺序消费RocketMQ-消息消费模式集群模式集群模式的演示(本身就默认)Rocketmq存储队列广播模式顺序消费如何改实现顺序消费RocketMQ-消息消费模式集群模式在消费模式为集群的情况下,如果机器是集群的,消息只会... 查看详情

rocketmq源码阅读----顺序消息(代码片段)

一、概述顺序消息的大致原理是发送的时候,比如同一个订单 id 的发送到同一个 queueId 中,如下单、支付、扣库存这个流程需要保证同一个订单 id 消息有序才能正常执行。在消费的时候,也只能有一个 consumer并... 查看详情

rocketmq之顺序消费(代码片段)

消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。顺序消费的原理解析,在默认的情况下消息发送会采取RoundRobin轮询方式把消息发送到不同的queue(... 查看详情

rocketmq——ordermessage(顺序消息)(代码片段)

生产者端消费者端运行效果补充RocketMQ提供了3种模式的Producer:NormalProducer(普通)、OrderProducer(顺序)、TransactionProducer(事务),对应的分别是普通消息、顺序消息和事务消息。在前面的博客当中,涉及的都是NormalProducer,调... 查看详情

一次rocketmq顺序消费延迟的问题定位(代码片段)

一次RocketMQ顺序消费延迟的问题定位问题背景与现象昨晚收到了应用报警,发现线上某个业务消费消息延迟了54s多(从消息发送到MQ到被消费的间隔):2021-06-30T23:12:46.756messageprocessingisincrediblydelayed!(Currentdelaytime:547... 查看详情

rocketmq(04)——发送顺序消息(代码片段)

发送顺序消息如果你的业务上对消息的发送和消费顺序有较高的需求,那么在发送消息的时候你需要把它们放到同一个消息队列中,因为只有同一个队列的消息才能确保消费的顺序性。下面代码我们在发送消息的时候ÿ... 查看详情

9springboot整合rocketmq实现顺序消息(代码片段)

rocketmq默认发送的消息是进入多个消息队列,然后消费端多线程并发消费,所以默认情况,不是顺序消费消息的;有时候,我们需要实现顺序消费一批消息,比如电商系统,订单创建,支付,完... 查看详情

rocketmq专题2:三种常用生产消费方式(顺序广播定时)以及顺序消费源码探究(代码片段)

...行常用的三种消息类型例子展示的时候,我们先来说一说RocketMQ的几个重要概念:PullConsumer与PushConsumer:主要区别在于Pull与Push的区别。对于PullConsumer,消费者会主动从broker中拉取消息进行消费。而对于PushConsumer,会封装包含消... 查看详情

rocketmq(二十)消息消费重试机制(代码片段)

1、顺序消息的消费重试对于顺序消息,当Consumer消费消息失败后,为了保证消息的顺序性,其会自动不断地进行消息重试,直到消费成功。消费重试默认间隔时间为1000毫秒。重试期间应用会出现消息消费被阻塞的... 查看详情

四.rocketmq极简入门-rocketmq顺序消息发送(代码片段)

...消费,队列本身是一种先进先出的数据结构,而RocketMQ理论上说也遵循这种机制。但是默认生产者以RoundRobin轮询方式把消息发送到不同的Queu 查看详情

rocketmq使用顺序消息(代码片段)

目录说明生产端消费端总结说明RocketMQ与其它消息队列一样,一个Topic利用多个队列来存储数据,单个队列内的数据是顺序存储的,但队列间的数据无法保证顺序性。RocketMQ目前支持保证某类数据或部分数据的顺序性。... 查看详情

rocketmq顺序消费

参考技术A对于所有的MQ来说,必问的一道面试题就是RocketMQ顺序消息怎样做?原理是什么?首先我们要明确什么顺序消费,顺序消费的定义是什么?我所理解的顺序消费,指的针对某一类消息,比如都是订单A的消息来说,它的消... 查看详情