rocketmq顺序消费(代码片段)

小~虎 小~虎     2022-11-05     446

关键词:

部分内容出处   https://www.jianshu.com/p/453c6e7ff81c

rocketmq内部有4个默认的队里,在发送消息时,同一组的消息需要按照顺序,发送到相应的mq中,同一组消息按照顺序进行消费,不同组的消息可以并行的进行消费。

下面看一下producer的代码:

package com.alibaba.rocketmq.example.message.order;

import com.alibaba.rocketmq.client.exception.MQBrokerException;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.MessageQueueSelector;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.common.message.MessageQueue;
import com.alibaba.rocketmq.remoting.exception.RemotingException;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;

/**
 * @author : Jixiaohu
 * @Date : 2018-04-19.
 * @Time : 9:20.
 * @Description :
 */
public class Producer 
    public static void main(String[] args) throws MQClientException, InterruptedException, MQBrokerException 
        String groupName = "order_producer";
        DefaultMQProducer producer = new DefaultMQProducer(groupName);
        producer.setNamesrvAddr("192.168.1.114:9876;192.168.2.2:9876");
        producer.start();

        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
        String dateStr = sdf.format(new Date());
        try 
            for (int i = 1; i <= 3; i++) 
                String body = dateStr + "Hello RoctetMq : " + i;
                Message msg = new Message("Topic1", "Tag1", "Key" + i,
                        body.getBytes());
                SendResult sendResult = producer.send(msg, new MessageQueueSelector() 
                    @Override
                    public MessageQueue select(List<MessageQueue> list, Message message, Object o) 
                        Integer id = (Integer) o;
                        return list.get(id);
                    
                , 0); //0是队列的下标
                System.out.println(sendResult);
            
            for (int i = 1; i <= 3; i++) 
                String body = dateStr + "Hello RoctetMq : " + i;
                Message msg = new Message("Topic1", "Tag1", "Key" + i,
                        body.getBytes());
                SendResult sendResult = producer.send(msg, new MessageQueueSelector() 
                    @Override
                    public MessageQueue select(List<MessageQueue> list, Message message, Object o) 
                        Integer id = (Integer) o;
                        return list.get(id);
                    
                , 1); //1是队列的下标
                System.out.println(sendResult);
            
            for (int i = 1; i <= 3; i++) 
                String body = dateStr + "Hello RoctetMq : " + i;
                Message msg = new Message("Topic1", "Tag1", "Key" + i,
                        body.getBytes());
                SendResult sendResult = producer.send(msg, new MessageQueueSelector() 
                    @Override
                    public MessageQueue select(List<MessageQueue> list, Message message, Object o) 
                        Integer id = (Integer) o;
                        return list.get(id);
                    
                , 2); //2是队列的下标
                System.out.println(sendResult);
            
            for (int i = 1; i <= 3; i++) 
                String body = dateStr + "Hello RoctetMq : " + i;
                Message msg = new Message("Topic1", "Tag1", "Key" + i,
                        body.getBytes());
                SendResult sendResult = producer.send(msg, new MessageQueueSelector() 
                    @Override
                    public MessageQueue select(List<MessageQueue> list, Message message, Object o) 
                        Integer id = (Integer) o;
                        return list.get(id);
                    
                , 3); //3是队列的下标
                System.out.println(sendResult);
            
            for (int i = 1; i <= 3; i++) 
                String body = dateStr + "Hello RoctetMq : " + i;
                Message msg = new Message("Topic1", "Tag1", "Key" + i,
                        body.getBytes());
                SendResult sendResult = producer.send(msg, new MessageQueueSelector() 
                    @Override
                    public MessageQueue select(List<MessageQueue> list, Message message, Object o) 
                        Integer id = (Integer) o;
                        return list.get(id);
                    
                , 4); //4是队列的下标
                System.out.println(sendResult);
            
            for (int i = 1; i <= 3; i++) 
                String body = dateStr + "Hello RoctetMq : " + i;
                Message msg = new Message("Topic1", "Tag1", "Key" + i,
                        body.getBytes());
                SendResult sendResult = producer.send(msg, new MessageQueueSelector() 
                    @Override
                    public MessageQueue select(List<MessageQueue> list, Message message, Object o) 
                        Integer id = (Integer) o;
                        return list.get(id);
                    
                , 5); //5是队列的下标
                System.out.println(sendResult);
            
         catch (RemotingException e) 
            e.printStackTrace();
            Thread.sleep(1000);
        
        producer.shutdown();
    

这边发送多组消息,每组消息的顺序分别为1,2,3,

下面查看consumer1,和consumer2,因为要顺序消费,需要注意的是,这两个消费者的监听器是MessageListenerOrderly,两个的代码一样,我这边就只展示一个consumer的代码

package com.alibaba.rocketmq.example.message.order;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;

import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;

/**
 * @author : Jixiaohu
 * @Date : 2018-04-23.
 * @Time : 9:35.
 * @Description : 顺序消息消费
 */
public class Consumer1 

    public Consumer1() throws Exception 
        String groupName = "order_producer";
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
        consumer.setNamesrvAddr("192.168.1.114:9876;192.168.2.2:9876");
        /**
         * 设置Consumer第一次启动是从队列头开始消费还是队列尾开始消费
         * 非第一次启动,那么按照上次消费的位置继续消费
         */
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        //订阅的主题,以及过滤的标签内容
        consumer.subscribe("Topic1", "*");
        //注册监听
        consumer.registerMessageListener(new Listener());
        consumer.start();
        System.out.println("Consumer1 Started.");
    

    class Listener implements MessageListenerOrderly 

        private Random random = new Random();

        @Override
        public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext context) 
            // 设置自动提交
            context.setAutoCommit(true);
            for (MessageExt msg : list) 
                System.out.println(msg + ",context" + new String(msg.getBody()));
            
            try 
                TimeUnit.SECONDS.sleep(random.nextInt(1));
             catch (InterruptedException e) 
                e.printStackTrace();
            


            return ConsumeOrderlyStatus.SUCCESS;
        
    

    public static void main(String[] args) throws Exception 
        new Consumer1();
    


还是按照先启动consumer的顺序,在启动producer的顺序。

这边看一下控制台的信息

总共6组消息,broker-a上接收到4组消息,broker-b上接收到2组消息,同一组的3条消息会发送到同一个broker的同一个队列中,这样才能保证顺序消费,

下面看一下consumer1和consumer2的打印结果

由于顺序消费只能单线程,一个线程只能去一个队列获取数据。

可以看到,这边的queueid都是3个 3个打印,不会出现交替,下面看一下一组消息的消费顺序,

 

可以看到,消息是按照发送的顺序,进行消费,consumer2的打印结果也是类似的,不过consumer2消费了6条消息。

 这样就实现了rocket的顺序消费,虽然实现了顺序消费,由于网络通信,会存在着重复数据的问题,

重复数据的问题,rocket不提供解决方案,让业务方自行解决,主要有两个方法:

  1. 消费端处理消息的业务逻辑保持幂等性
  2. 保证每条消息都有唯一编号且保证消息处理成功与去重表的日志同时出现

第1条很好理解,只要保持幂等性,不管来多少条重复消息,最后处理的结果都一样。第2条原理就是利用一张日志表来记录已经处理成功的消息的ID,如果新到的消息ID已经在日志表中,那么就不再处理这条消息。

第1条解决方案,很明显应该在消费端实现,不属于消息系统要实现的功能。第2条可以消息系统实现,也可以业务端实现。正常情况下出现重复消息的概率其实很小,如果由消息系统来实现的话,肯定会对消息系统的吞吐量和高可用有影响,所以最好还是由业务端自己处理消息重复的问题,这也是RocketMQ不解决消息重复的问题的原因。

RocketMQ不保证消息不重复,如果你的业务需要保证严格的不重复消息,需要你自己在业务端去重。

下面把consumer修改成多线程的模式,再次查看一下运行的结果:

package com.alibaba.rocketmq.example.message.thread;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;

import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;

/**
 * @author : Jixiaohu
 * @Date : 2018-04-23.
 * @Time : 9:35.
 * @Description : 顺序消息消费
 */
public class Consumer 

    public Consumer() throws Exception 
        String groupName = "order_producer";
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
        consumer.setNamesrvAddr("192.168.1.114:9876;192.168.2.2:9876");
        /**
         * 设置Consumer第一次启动是从队列头开始消费还是队列尾开始消费
         * 非第一次启动,那么按照上次消费的位置继续消费
         */
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        //最小线程数
        consumer.setConsumeThreadMin(10);

        //最大线程数
        consumer.setConsumeThreadMax(20);

        //订阅的主题,以及过滤的标签内容
        consumer.subscribe("Topic1", "*");
        //注册监听
        consumer.registerMessageListener(new Listener());
        consumer.start();
        System.out.println("Consumer Started.");
    

    class Listener implements MessageListenerConcurrently 

        private Random random = new Random();

        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) 

            for (MessageExt msg : list) 
                System.out.println(msg + ",context" + new String(msg.getBody()));
            
            try 
                TimeUnit.SECONDS.sleep(random.nextInt(1));
             catch (InterruptedException e) 
                e.printStackTrace();
            


            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        
    

    public static void main(String[] args) throws Exception 
        new Consumer();
    


 同样先启动consumer,在启动producer,查看一下打印结果:

从打印结果,可以看出consumer消费并不能保证严格的顺序,多线程和顺序,只能保证其中的一个。producer能保证消息发送的顺序,不能保证消息消费的顺序,要保证消息消费的顺序,consumer端必须实现 MessageListenerOrderly 接口。

 

rocketmq之顺序消费(代码片段)

消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。顺序消费的原理解析,在默认的情况下消息发送会采取RoundRobin轮询方式把消息发送到不同的queue(... 查看详情

rocketmq之顺序消费(代码片段)

消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。顺序消费的原理解析,在默认的情况下消息发送会采取RoundRobin轮询方式把消息发送到不同的queue(... 查看详情

一次rocketmq顺序消费延迟的问题定位(代码片段)

一次RocketMQ顺序消费延迟的问题定位问题背景与现象昨晚收到了应用报警,发现线上某个业务消费消息延迟了54s多(从消息发送到MQ到被消费的间隔):2021-06-30T23:12:46.756messageprocessingisincrediblydelayed!(Currentdelaytime:547... 查看详情

rocketmq笔记:顺序消息(代码片段)

...是可以按照消息的发送顺序来消费(FIFO)。  顺序消息是RocketMQ提供的一种消息类型,支持消费者按照发送消息的先后顺序获取消息。顺序消息在发送、存储和投递的处理过程中,强调多条消息间的先后顺序关系。RocketMQ顺序消... 查看详情

rocketmq专题2:三种常用生产消费方式(顺序广播定时)以及顺序消费源码探究(代码片段)

...行常用的三种消息类型例子展示的时候,我们先来说一说RocketMQ的几个重要概念:PullConsumer与PushConsumer:主要区别在于Pull与Push的区别。对于PullConsumer,消费者会主动从broker中拉取消息进行消费。而对于PushConsumer,会封装包含消... 查看详情

rocketmq-消息消费模式顺序消费(代码片段)

RocketMQ-消息消费模式顺序消费RocketMQ-消息消费模式集群模式集群模式的演示(本身就默认)Rocketmq存储队列广播模式顺序消费如何改实现顺序消费RocketMQ-消息消费模式集群模式在消费模式为集群的情况下,如果机器是集群的,消息只会... 查看详情

rocketmq源码(22)—consumemessageorderlyservice顺序消费消息源码(代码片段)

基于RocketMQrelease-4.9.3,深入的介绍了ConsumeMessageOrderlyService顺序消费消息源码。此前我们学习了consumer消息的拉取流程源码:RocketMQ源码(18)—DefaultMQPushConsumer消费者发起拉取消息请求源码RocketMQ源码(19)—Broker处理DefaultMQPushC... 查看详情

rocketmq(十四)顺序消息(代码片段)

1、什么是顺序消息顺序消息指的是,严格按照消息的发送顺序进行消费的消息(FIFO)。默认情况下生产者会把消息以RoundRobin轮询方式发送到不同的Queue分区队列;而消费消息时会从多个Queue上拉取消息,这种情况下的发... 查看详情

rocketmq(二十)消息消费重试机制(代码片段)

1、顺序消息的消费重试对于顺序消息,当Consumer消费消息失败后,为了保证消息的顺序性,其会自动不断地进行消息重试,直到消费成功。消费重试默认间隔时间为1000毫秒。重试期间应用会出现消息消费被阻塞的... 查看详情

rocketmq之顺序消费(代码片段)

消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。顺序消费的原理解析,在默认的情况下消息发送会采取RoundRobin轮询方式把消息发送到不同的queue(... 查看详情

9springboot整合rocketmq实现顺序消息(代码片段)

rocketmq默认发送的消息是进入多个消息队列,然后消费端多线程并发消费,所以默认情况,不是顺序消费消息的;有时候,我们需要实现顺序消费一批消息,比如电商系统,订单创建,支付,完... 查看详情

rocketmq源码阅读----顺序消息(代码片段)

一、概述顺序消息的大致原理是发送的时候,比如同一个订单 id 的发送到同一个 queueId 中,如下单、支付、扣库存这个流程需要保证同一个订单 id 消息有序才能正常执行。在消费的时候,也只能有一个 consumer并... 查看详情

rocketmq(04)——发送顺序消息(代码片段)

发送顺序消息如果你的业务上对消息的发送和消费顺序有较高的需求,那么在发送消息的时候你需要把它们放到同一个消息队列中,因为只有同一个队列的消息才能确保消费的顺序性。下面代码我们在发送消息的时候,调用的是... 查看详情

rocketmq——ordermessage(顺序消息)(代码片段)

生产者端消费者端运行效果补充RocketMQ提供了3种模式的Producer:NormalProducer(普通)、OrderProducer(顺序)、TransactionProducer(事务),对应的分别是普通消息、顺序消息和事务消息。在前面的博客当中,涉及的都是NormalProducer,调... 查看详情

四.rocketmq极简入门-rocketmq顺序消息发送(代码片段)

...消费,队列本身是一种先进先出的数据结构,而RocketMQ理论上说也遵循这种机制。但是默认生产者以RoundRobin轮询方式把消息发送到不同的Queu 查看详情

rocketmq(04)——发送顺序消息(代码片段)

发送顺序消息如果你的业务上对消息的发送和消费顺序有较高的需求,那么在发送消息的时候你需要把它们放到同一个消息队列中,因为只有同一个队列的消息才能确保消费的顺序性。下面代码我们在发送消息的时候ÿ... 查看详情

rocketmq使用顺序消息(代码片段)

目录说明生产端消费端总结说明RocketMQ与其它消息队列一样,一个Topic利用多个队列来存储数据,单个队列内的数据是顺序存储的,但队列间的数据无法保证顺序性。RocketMQ目前支持保证某类数据或部分数据的顺序性。... 查看详情

springboot整合rocketmq的各种消息类型,生产者,消费者(代码片段)

...顺序消息消费者sql92过滤消息消费者事物消息消费者@RocketMQMessageListener参数解释我的rocketmq各种集群方案安装Springboot整合使用pom依赖<dependency><groupId>org.apache.roc 查看详情