rocketmq使用顺序消息(代码片段)

乐观男孩 乐观男孩     2022-12-12     349

关键词:

目录

说明

RocketMQ与其它消息队列一样,一个Topic利用多个队列来存储数据,单个队列内的数据是顺序存储的,但队列间的数据无法保证顺序性。RocketMQ目前支持保证某类数据或部分数据的顺序性。
核心思想是:发送消息时,可以通过实现MessageQueueSelector接口,选择消息发送到哪个队列,从而保证某类数据的顺序性。同时,可以在send方法中指定入参,方便MessageQueueSelector接口内部根据入参选择指定的队列。

生产端

@Test
public void sendMessageOrderly() throws Exception 
    DefaultMQProducer defaultMQProducer = RocketMqUtil.getDefaultMQProducer();
    int arg = 3;
    for (int i = 0; i < 3; i++) 
        Message message = new Message(RocketMqUtil.TOPIC, "orderly", "顺序消息".getBytes(Charset.forName("UTF-8")));
        //为了保证消息顺序,则消息发送到同一个队列中,可通过MessageQueueSelector实现
        // 可以通过arg参数在内部协助计算发送到哪个队列
        defaultMQProducer.send(message, new MessageQueueSelector() 
            @Override
            public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) 
                //根据参数选定消息发送到哪个队列,确保同类消息在同一队列中,以确保消息是按顺序存放
                int index = ((int) arg) % mqs.size();
                log.info("队列数:,当前队列数:", mqs.size(), index);
                return mqs.get(index);
            
        , arg);
    
    defaultMQProducer.shutdown();

运行效果:

从上图可以看到,消息每次都是发送到同一个队列(编号为3的队列)。

RocketMQ上的消息:

消费端

@Test
public void consumeMessageOrderly () throws Exception 
    DefaultMQPushConsumer defaultMQPushConsumer = RocketMqUtil.getDefaultMQPushConsumer();
    defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    defaultMQPushConsumer.subscribe(RocketMqUtil.TOPIC, "*");
    //消费监听器指定顺序消息监听器
    defaultMQPushConsumer.registerMessageListener(new MessageListenerOrderly() 
        @Override
        public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
                                                   ConsumeOrderlyContext context) 
            log.info("消费到消息条数:", msgs.size());
            msgs.stream().map(messageExt -> new String(messageExt.getBody(), Charset.forName("UTF-8")))
                    .map(String::new).forEach(System.out::println);
            return ConsumeOrderlyStatus.SUCCESS;
        
    );
    defaultMQPushConsumer.start();
    Thread.sleep(5000L);
    defaultMQPushConsumer.shutdown();

消费端代码与普通消息消费唯一的不同是消息监听器要使用MessageListenerOrderly接口类消息监听器。

消费结果:

总结

1、生产端,通过实现MessageQueueSelector接口,选择某一类消息发送到同一个queue,由于同一个queue是有序的,所以消息也是有序的。
2、消费端,使用MessageListenerOrderly作为消息的监听器。

rocketmq使用顺序消息(代码片段)

目录说明生产端消费端总结说明RocketMQ与其它消息队列一样,一个Topic利用多个队列来存储数据,单个队列内的数据是顺序存储的,但队列间的数据无法保证顺序性。RocketMQ目前支持保证某类数据或部分数据的顺序性。... 查看详情

深入理解rocketmq普通消息和顺序消息使用,原理,优化(代码片段)

...以这方面为主。这次打压的过程中收获比较的大的是,对RocketMq的一些优化。最开始我们公司使用的是RabbitMq,在一些流量高峰的场景下,发现队列堆积比较严重,导致RabbitMq挂了。为了应对这个场景,最终我们引入了阿里云的Rocke... 查看详情

rocketmq笔记:顺序消息(代码片段)

...是可以按照消息的发送顺序来消费(FIFO)。  顺序消息是RocketMQ提供的一种消息类型,支持消费者按照发送消息的先后顺序获取消息。顺序消息在发送、存储和投递的处理过程中,强调多条消息间的先后顺序关系。RocketMQ顺序消... 查看详情

rocketmq使用顺序消息(代码片段)

目录说明生产端消费端总结说明RocketMQ与其它消息队列一样,一个Topic利用多个队列来存储数据,单个队列内的数据是顺序存储的,但队列间的数据无法保证顺序性。RocketMQ目前支持保证某类数据或部分数据的顺序性。... 查看详情

rocketmq的顺序消息(顺序消费)(代码片段)

简单介绍了消息有序性的概念,以及RocketMQ如何实现消息的顺序消费。文章目录1消息的有序性2生产者有序发送3消费者有序消费1消息的有序性消息有序指的是一类消息消费时,能按照发送的顺序来消费。例如:一个订... 查看详情

解析rocketmq业务消息-“顺序消息”(代码片段)

引言ApacheRocketMQ诞生至今,历经十余年大规模业务稳定性打磨,服务了阿里集团内部业务以及阿里云数以万计的企业客户。作为金融级可靠的业务消息方案,RocketMQ从创建之初就一直专注于业务集成领域的异步通信能... 查看详情

rocketmq(04)——发送顺序消息(代码片段)

发送顺序消息如果你的业务上对消息的发送和消费顺序有较高的需求,那么在发送消息的时候你需要把它们放到同一个消息队列中,因为只有同一个队列的消息才能确保消费的顺序性。下面代码我们在发送消息的时候,调用的是... 查看详情

rocketmq有序消息(代码片段)

RocketMQ提供的顺序消费消息实现是使用的FIFO先进先出算法Producer消息发送publicclassProducerpublicstaticvoidmain(String[]args)throwsUnsupportedEncodingExceptiontryMQProducerproducer=newDefaultMQProducer("please_rename_unique_ 查看详情

四.rocketmq极简入门-rocketmq顺序消息发送(代码片段)

...消费,队列本身是一种先进先出的数据结构,而RocketMQ理论上说也遵循这种机制。但是默认生产者以RoundRobin轮询方式把消息发送到不同的Queu 查看详情

rocketmq顺序消费(代码片段)

部分内容出处  https://www.jianshu.com/p/453c6e7ff81crocketmq内部有4个默认的队里,在发送消息时,同一组的消息需要按照顺序,发送到相应的mq中,同一组消息按照顺序进行消费,不同组的消息可以并行的进行消费。下面看一下pr... 查看详情

rocketmq之顺序消费(代码片段)

消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。顺序消费的原理解析,在默认的情况下消息发送会采取RoundRobin轮询方式把消息发送到不同的queue(... 查看详情

rocketmq之顺序消费(代码片段)

消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。顺序消费的原理解析,在默认的情况下消息发送会采取RoundRobin轮询方式把消息发送到不同的queue(... 查看详情

rocketmq(十四)顺序消息(代码片段)

1、什么是顺序消息顺序消息指的是,严格按照消息的发送顺序进行消费的消息(FIFO)。默认情况下生产者会把消息以RoundRobin轮询方式发送到不同的Queue分区队列;而消费消息时会从多个Queue上拉取消息,这种情况下的发... 查看详情

rocketmq(04)——发送顺序消息(代码片段)

发送顺序消息如果你的业务上对消息的发送和消费顺序有较高的需求,那么在发送消息的时候你需要把它们放到同一个消息队列中,因为只有同一个队列的消息才能确保消费的顺序性。下面代码我们在发送消息的时候ÿ... 查看详情

rocketmq源码阅读----顺序消息(代码片段)

一、概述顺序消息的大致原理是发送的时候,比如同一个订单 id 的发送到同一个 queueId 中,如下单、支付、扣库存这个流程需要保证同一个订单 id 消息有序才能正常执行。在消费的时候,也只能有一个 consumer并... 查看详情

9springboot整合rocketmq实现顺序消息(代码片段)

rocketmq默认发送的消息是进入多个消息队列,然后消费端多线程并发消费,所以默认情况,不是顺序消费消息的;有时候,我们需要实现顺序消费一批消息,比如电商系统,订单创建,支付,完... 查看详情

rocketmq——ordermessage(顺序消息)(代码片段)

生产者端消费者端运行效果补充RocketMQ提供了3种模式的Producer:NormalProducer(普通)、OrderProducer(顺序)、TransactionProducer(事务),对应的分别是普通消息、顺序消息和事务消息。在前面的博客当中,涉及的都是NormalProducer,调... 查看详情

聊一聊顺序消息(rocketmq顺序消息的实现机制)(代码片段)

当我们说顺序时,我们在说什么?日常思维中,顺序大部分情况会和时间关联起来,即时间的先后表示事件的顺序关系。比如事件A发生在下午3点一刻,而事件B发生在下午4点,那么我们认为事件A发生在事件B之前,他们的顺序关... 查看详情