rocketmq(04)——发送顺序消息(代码片段)

elim168 elim168     2022-11-29     609

关键词:

发送顺序消息

如果你的业务上对消息的发送和消费顺序有较高的需求,那么在发送消息的时候你需要把它们放到同一个消息队列中,因为只有同一个队列的消息才能确保消费的顺序性。下面代码我们在发送消息的时候,调用的是需要传递MessageQueueSelector的send(),该方法还可以传递一个额外的参数,其对应MessageQueueSelector的select()的最后一个参数。下面代码中我们一共发送了10条消息,从1开始算顺序为奇数的都放到第一个队列中,顺序为偶数的都放第二个队列中。所以最终第一个队列放了顺序号为1/3/5/7/9的消息,第二个队列中放了顺序号为2/4/6/8/10的消息。

@Test
public void testOrderSend() throws Exception 
  DefaultMQProducer producer = new DefaultMQProducer("group1");
  producer.setNamesrvAddr(this.nameServer);
  producer.start();
  for (int i=0; i<10; i++) 
    Message message = new Message("topic1", "tag3", (System.currentTimeMillis() + "---" + System.nanoTime() + "hello ordered message " + i).getBytes());
    SendResult sendResult = producer.send(message, new MessageQueueSelector() 
      @Override
      public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) 
        int index = (int) arg;
        //奇数放一个队列,偶数放一个队列
        return mqs.get(index % mqs.size() % 2);
      
    , i);
    Assert.assertTrue(sendResult.getSendStatus() == SendStatus.SEND_OK);
  
  producer.shutdown();

在消费的时候如果需要确保队列中的消息是按照顺序消费的,注册消息监听器时不能再选择并发消费的MessageListenerConcurrently,而需要选择按顺序消费的MessageListenerOrderly。按顺序消费时每个线程会锁定当前队列,只有一条消息消费完了才会释放锁,这样确保同一队列同时只能有一个线程消费一条消息。而并发消费时是不会一直锁队列的。有序消费时同一个队列里面的消息会按照顺序进行消费,但是它们可能被不同的线程消费。如消息的顺序是1/2/3/4/5/6,则按照顺序消费可以保证消息的消费顺序一定是1/2/3/4/5/6,但是消费它们的线程有可能是线程6/5/4/3/2/1。如果要保证有序的消费是在同一个线程完成的,则消费者线程只能有一个,可以通过setConsumeThreadMax()定义消费线程的最大数,可以通过setConsumeThreadMin设置消费者线程的最小数。下面的代码中就定义了按照顺序进行消息的消费。

@Test
public void testOrderConsume() throws Exception 
  DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group2");
  consumer.setNamesrvAddr(this.nameServer);
  consumer.subscribe("topic1", "tag3");
  consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);//默认值,订阅以前的消息将被忽略
  consumer.registerMessageListener(new MessageListenerOrderly() 
    @Override
    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) 
      System.out.println(Thread.currentThread().getName() + "消费消息:" + new String(msgs.get(0).getBody()));
      return ConsumeOrderlyStatus.SUCCESS;
    
  );
  consumer.start();
  TimeUnit.SECONDS.sleep(120);
  consumer.shutdown();

顺序消息消费时返回的ConsumeOrderlyStatus只能是SUCCESS和SUSPEND_CURRENT_QUEUE_A_MOMENT。SUCCESS表示消费成功,可以继续消费下一条,而SUSPEND_CURRENT_QUEUE_A_MOMENT表示消费失败,需要等待一下再继续消费。它不能像并发消费那样跳过消费失败的消息,因为那样就破坏了消息消费的顺序性。
(注:本文是基于RocketMQ4.5.0所写)

rocketmq笔记:顺序消息(代码片段)

...是可以按照消息的发送顺序来消费(FIFO)。  顺序消息是RocketMQ提供的一种消息类型,支持消费者按照发送消息的先后顺序获取消息。顺序消息在发送、存储和投递的处理过程中,强调多条消息间的先后顺序关系。RocketMQ顺序消... 查看详情

rocketmq源码—十rocketmq顺序消息(代码片段)

RocketMQ本身支持顺序消息,在使用上发送顺序消息和非顺序消息有所区别发送顺序消息SendResultsendResult=producer.send(msg,newMessageQueueSelector()@OverridepublicMessageQueueselect(List<MessageQueue>mqs,Messagemsg,Objectarg)Integeri 查看详情

rocketmq源码—十rocketmq顺序消息(代码片段)

RocketMQ本身支持顺序消息,在使用上发送顺序消息和非顺序消息有所区别发送顺序消息SendResultsendResult=producer.send(msg,newMessageQueueSelector()@OverridepublicMessageQueueselect(List<MessageQueue>mqs,Messagemsg,Objectarg)Integeri 查看详情

四.rocketmq极简入门-rocketmq顺序消息发送(代码片段)

...消费,队列本身是一种先进先出的数据结构,而RocketMQ理论上说也遵循这种机制。但是默认生产者以RoundRobin轮询方式把消息发送到不同的Queu 查看详情

rocketmq之顺序消费(代码片段)

消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。顺序消费的原理解析,在默认的情况下消息发送会采取RoundRobin轮询方式把消息发送到不同的queue(... 查看详情

rocketmq之顺序消费(代码片段)

消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。顺序消费的原理解析,在默认的情况下消息发送会采取RoundRobin轮询方式把消息发送到不同的queue(... 查看详情

rocketmq(十四)顺序消息(代码片段)

1、什么是顺序消息顺序消息指的是,严格按照消息的发送顺序进行消费的消息(FIFO)。默认情况下生产者会把消息以RoundRobin轮询方式发送到不同的Queue分区队列;而消费消息时会从多个Queue上拉取消息,这种情况下的发... 查看详情

rocketmq的顺序消息(顺序消费)(代码片段)

简单介绍了消息有序性的概念,以及RocketMQ如何实现消息的顺序消费。文章目录1消息的有序性2生产者有序发送3消费者有序消费1消息的有序性消息有序指的是一类消息消费时,能按照发送的顺序来消费。例如:一个订... 查看详情

rocketmq使用顺序消息(代码片段)

目录说明生产端消费端总结说明RocketMQ与其它消息队列一样,一个Topic利用多个队列来存储数据,单个队列内的数据是顺序存储的,但队列间的数据无法保证顺序性。RocketMQ目前支持保证某类数据或部分数据的顺序性。... 查看详情

rocketmq顺序消费(代码片段)

部分内容出处  https://www.jianshu.com/p/453c6e7ff81crocketmq内部有4个默认的队里,在发送消息时,同一组的消息需要按照顺序,发送到相应的mq中,同一组消息按照顺序进行消费,不同组的消息可以并行的进行消费。下面看一下pr... 查看详情

rocketmq源码阅读----顺序消息(代码片段)

一、概述顺序消息的大致原理是发送的时候,比如同一个订单 id 的发送到同一个 queueId 中,如下单、支付、扣库存这个流程需要保证同一个订单 id 消息有序才能正常执行。在消费的时候,也只能有一个 consumer并... 查看详情

rocketmq之顺序消费(代码片段)

消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。顺序消费的原理解析,在默认的情况下消息发送会采取RoundRobin轮询方式把消息发送到不同的queue(... 查看详情

三.rocketmq极简入门-rocketmq普通消息发送(代码片段)

前言RocketMQ已经写了两章了,一章是RocketMQ认识和安装,一章是RocketMQ的工作流程和核心概念,本章我们开始使用RocketMQ来发送和接收消息。RocketMQ的消息种类非常多,比如:普通消息,顺序消息,延迟消... 查看详情

rocketmq-如何实现顺序消息

...新增消息、修改消息。如何发送和消费顺序消息我们使用RocketMQ顺序消息来模拟一下订单的场景,顺序消息分为两部分:顺序发送、顺序消费。1.顺序发消息上面代码模拟了按顺序依次发送创建、支付、退款消息到TopicTest中。在ap... 查看详情

rocketmq使用顺序消息(代码片段)

目录说明生产端消费端总结说明RocketMQ与其它消息队列一样,一个Topic利用多个队列来存储数据,单个队列内的数据是顺序存储的,但队列间的数据无法保证顺序性。RocketMQ目前支持保证某类数据或部分数据的顺序性。... 查看详情

rocketmq有序消息(代码片段)

RocketMQ提供的顺序消费消息实现是使用的FIFO先进先出算法Producer消息发送publicclassProducerpublicstaticvoidmain(String[]args)throwsUnsupportedEncodingExceptiontryMQProducerproducer=newDefaultMQProducer("please_rename_unique_ 查看详情

9springboot整合rocketmq实现顺序消息(代码片段)

rocketmq默认发送的消息是进入多个消息队列,然后消费端多线程并发消费,所以默认情况,不是顺序消费消息的;有时候,我们需要实现顺序消费一批消息,比如电商系统,订单创建,支付,完... 查看详情

rocketmq—生产者客户端详解(代码片段)

...ACK与重试机制,消息的顺序生产,批量发送等。RocketMQ在具备这些特性的同时,有自己独有的特性。下面我们对RocketMQ的生产者开展讲解。一、消息发送1.同步发送消息同步发送是指消息发送方发出数据后,同步等... 查看详情