rocketmq(十四)顺序消息(代码片段)

爱是与世界平行 爱是与世界平行     2022-12-03     283

关键词:

1、什么是顺序消息

顺序消息指的是,严格按照消息的发送顺序进行消费的消息(FIFO)。

默认情况下生产者会把消息以Round Robin轮询方式发送到不同的Queue分区队列;而消费消息时会从多个Queue上拉取消息,这种情况下的发送和消费是不能保证顺序的。如果将消息仅发送到同一个Queue中,消费时也只从这个Queue上拉取消息,就严格保证了消息的顺序性。

2、为什么需要顺序消息

例如,现在有TOPIC ORDER_STATUS (订单状态),其下有4个Queue队列,该Topic中的不同消息用于描述当前订单的不同状态。假设订单有状态:未支付、已支付、发货中、发货成功、发货失败。

根据以上订单状态,生产者从时序上可以生成如下几个消息:

订单T0000001:未支付 —> 订单T0000001:已支付 —> 订单T0000001:发货中 —> 订单T0000001:发货失败

消息发送到MQ中之后,Queue的选择如果采用轮询策略,消息在MQ的存储可能如下:

这种情况下,我们希望Consumer消费消息的顺序和我们发送是一致的,然而上述MQ的投递和消费方式,我们无法保证顺序是正确的。对于顺序异常的消息,Consumer即使设置有一定的状态容错,也不能完全处理好这么多种随机出现组合情况。

基于上述的情况,可以设计如下方案:对于相同订单号的消息,通过一定的策略,将其放置在一个Queue中,然后消费者再采用一定的策略(例如,一个线程独立处理一个queue,保证处理消息的顺序性),能够保证消费的顺序性。

3、有序性分类

根据有序范围的不同,RocketMQ可以严格地保证两种消息的有序性:分区有序与全局有序。

3.1、全局有序

当发送和消费参与的Queue只有一个时所保证的有序是整个Topic中消息的顺序, 称为全局有序。

在创建Topic时指定Queue的数量。有三种指定方式:

1)在代码中创建Producer时,可以指定其自动创建的Topic的Queue数量

2)在RocketMQ可视化控制台中手动创建Topic时指定Queue数量

3)使用mqadmin命令手动创建Topic时指定Queue数量

3.2、分区有序

如果有多个Queue参与,其仅可保证在该Queue分区队列上的消息顺序,则称为分区有序。

如何实现Queue的选择?在定义Producer时我们可以指定消息队列选择器,而这个选择器是我们自己实现了MessageQueueSelector接口定义的。

在定义选择器的选择算法时,一般需要使用选择key。这个选择key可以是消息key也可以是其它数据。但无论谁做选择key,都不能重复,都是唯一的。一般性的选择算法是,让选择key(或其hash值)与该Topic所包含的Queue的数量取模,其结果即为选择出的Queue的QueueId。

取模算法存在一个问题:不同选择key与Queue数量取模结果可能会是相同的,即不同选择key的消息可能会出现在相同的Queue,即同一个Consuemr可能会消费到不同选择key的消息。这个问题如何解决?一般性的作法是,从消息中获取到选择key,对其进行判断。若是当前Consumer需要消费的消息,则直接消费,否则,什么也不做。这种做法要求选择key要能够随着消息一起被Consumer获取到。此时使用消息key作为选择key是比较好的做法。

以上做法会不会出现如下新的问题呢?不属于那个Consumer的消息被拉取走了,那么应该消费该消息的Consumer是否还能再消费该消息呢?同一个Queue中的消息不可能被同一个Group中的不同Consumer同时消费。所以,消费同一个Queue的不同选择key的消息的Consumer一定属于不同的Group。而不同的Group中的Consumer间的消费是相互隔离的,互不影响的。

4、代码举例

public class OrderedProducer 
    public static void main(String[] args) throws Exception 
        DefaultMQProducer producer = new DefaultMQProducer("pg");
        producer.setNamesrvAddr("rocketmqOS:9876");
        producer.start();
        for (int i = 0; i < 100; i++) 
            Integer orderId = i;
            byte[] body = ("Hi," + i).getBytes();
            Message msg = new Message("TopicA", "TagA", body);
            SendResult sendResult = producer.send(msg, new MessageQueueSelector() 
                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) 
                    Integer id = (Integer) arg;
                    int index = id % mqs.size();
                    return mqs.get(index);
                
            , orderId);
            System.out.println(sendResult);
        
        producer.shutdown();
    

rocketmq源码—十rocketmq顺序消息(代码片段)

RocketMQ本身支持顺序消息,在使用上发送顺序消息和非顺序消息有所区别发送顺序消息SendResultsendResult=producer.send(msg,newMessageQueueSelector()@OverridepublicMessageQueueselect(List<MessageQueue>mqs,Messagemsg,Objectarg)Integeri 查看详情

rocketmq笔记:顺序消息(代码片段)

...是可以按照消息的发送顺序来消费(FIFO)。  顺序消息是RocketMQ提供的一种消息类型,支持消费者按照发送消息的先后顺序获取消息。顺序消息在发送、存储和投递的处理过程中,强调多条消息间的先后顺序关系。RocketMQ顺序消... 查看详情

rocketmq的顺序消息(顺序消费)(代码片段)

简单介绍了消息有序性的概念,以及RocketMQ如何实现消息的顺序消费。文章目录1消息的有序性2生产者有序发送3消费者有序消费1消息的有序性消息有序指的是一类消息消费时,能按照发送的顺序来消费。例如:一个订... 查看详情

rocketmq使用顺序消息(代码片段)

目录说明生产端消费端总结说明RocketMQ与其它消息队列一样,一个Topic利用多个队列来存储数据,单个队列内的数据是顺序存储的,但队列间的数据无法保证顺序性。RocketMQ目前支持保证某类数据或部分数据的顺序性。... 查看详情

解析rocketmq业务消息-“顺序消息”(代码片段)

引言ApacheRocketMQ诞生至今,历经十余年大规模业务稳定性打磨,服务了阿里集团内部业务以及阿里云数以万计的企业客户。作为金融级可靠的业务消息方案,RocketMQ从创建之初就一直专注于业务集成领域的异步通信能... 查看详情

rocketmq(04)——发送顺序消息(代码片段)

发送顺序消息如果你的业务上对消息的发送和消费顺序有较高的需求,那么在发送消息的时候你需要把它们放到同一个消息队列中,因为只有同一个队列的消息才能确保消费的顺序性。下面代码我们在发送消息的时候,调用的是... 查看详情

四.rocketmq极简入门-rocketmq顺序消息发送(代码片段)

...消费,队列本身是一种先进先出的数据结构,而RocketMQ理论上说也遵循这种机制。但是默认生产者以RoundRobin轮询方式把消息发送到不同的Queu 查看详情

rocketmq顺序消费(代码片段)

部分内容出处  https://www.jianshu.com/p/453c6e7ff81crocketmq内部有4个默认的队里,在发送消息时,同一组的消息需要按照顺序,发送到相应的mq中,同一组消息按照顺序进行消费,不同组的消息可以并行的进行消费。下面看一下pr... 查看详情

rocketmq之顺序消费(代码片段)

消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。顺序消费的原理解析,在默认的情况下消息发送会采取RoundRobin轮询方式把消息发送到不同的queue(... 查看详情

rocketmq之顺序消费(代码片段)

消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。顺序消费的原理解析,在默认的情况下消息发送会采取RoundRobin轮询方式把消息发送到不同的queue(... 查看详情

rocketmq(04)——发送顺序消息(代码片段)

发送顺序消息如果你的业务上对消息的发送和消费顺序有较高的需求,那么在发送消息的时候你需要把它们放到同一个消息队列中,因为只有同一个队列的消息才能确保消费的顺序性。下面代码我们在发送消息的时候ÿ... 查看详情

rocketmq源码阅读----顺序消息(代码片段)

一、概述顺序消息的大致原理是发送的时候,比如同一个订单 id 的发送到同一个 queueId 中,如下单、支付、扣库存这个流程需要保证同一个订单 id 消息有序才能正常执行。在消费的时候,也只能有一个 consumer并... 查看详情

springcloud-springcloud之stream构建消息驱动微服务框架;springcloudalibaba集成rocketmq(二十四)(代码片段)

阅读本文前可先参考SpringCloud-SpringCloud之Stream构建消息驱动微服务框架;RabbitMQ(十九)_MinggeQingchun的博客-CSDN博客_springcloudstream一、SpringCloudStream在微服务的开发过程中,会经常用到消息中间件,通过消息中间... 查看详情

深入理解rocketmq普通消息和顺序消息使用,原理,优化(代码片段)

...以这方面为主。这次打压的过程中收获比较的大的是,对RocketMq的一些优化。最开始我们公司使用的是RabbitMq,在一些流量高峰的场景下,发现队列堆积比较严重,导致RabbitMq挂了。为了应对这个场景,最终我们引入了阿里云的Rocke... 查看详情

9springboot整合rocketmq实现顺序消息(代码片段)

rocketmq默认发送的消息是进入多个消息队列,然后消费端多线程并发消费,所以默认情况,不是顺序消费消息的;有时候,我们需要实现顺序消费一批消息,比如电商系统,订单创建,支付,完... 查看详情

rocketmq——ordermessage(顺序消息)(代码片段)

生产者端消费者端运行效果补充RocketMQ提供了3种模式的Producer:NormalProducer(普通)、OrderProducer(顺序)、TransactionProducer(事务),对应的分别是普通消息、顺序消息和事务消息。在前面的博客当中,涉及的都是NormalProducer,调... 查看详情

聊一聊顺序消息(rocketmq顺序消息的实现机制)(代码片段)

当我们说顺序时,我们在说什么?日常思维中,顺序大部分情况会和时间关联起来,即时间的先后表示事件的顺序关系。比如事件A发生在下午3点一刻,而事件B发生在下午4点,那么我们认为事件A发生在事件B之前,他们的顺序关... 查看详情

rocketmq入门到入土事务消息&顺序消息(代码片段)

接上一篇:RocketMQ入门到入土(一)新手也能看懂的原理和实战!一、事务消息的由来1、案例引用官方的购物案例:小明购买一个100元的东西,账户扣款100元的同时需要保证在下游的积分系统给小明这个账号增加100积分。账号系... 查看详情