rocketmq-消息消费模式顺序消费(代码片段)

杨宸杨 杨宸杨     2023-03-03     386

关键词:

RocketMQ-消息消费模式 顺序消费


RocketMQ-消息消费模式

集群模式

在消费模式为集群的情况下,如果机器是集群的,消息只会给集群中的其中一台机器消费到

集群模式的演示(本身就默认)

假设我们生产者生产了十条信息 ,当我们集群了两台消费者服务器的时候,就会每个服务器执行五条

Rocketmq存储队列

在消息中间件每个topic是有4个写和读队列,主要是解决并发性能的问题的
如果只有一个队列,保证线程安全,必须得给队列进行写操作的时候上锁。
多几个队列,降低并发度,等待时间就短一些。

为什么是四个队列?

因为大多数服务器只有四核,意味着同时最多只能有CPU同时工作

广播模式

在消费模式为集群的情况下,如果机器是集群的,消费是会给集群中的所有机器所消费到

public class Consumer 
    public static void main(String[] args) throws Exception 
        //定义消息消费者(在同一个JVM中,消费者的组名不能重复)
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("helloConsumerGroup");
        //设置nameServer地址
        consumer.setNamesrvAddr("43.143.161.59:9876");
        //设置订阅的主题
        consumer.subscribe("helloTopic","*");
        //设置消费模式
        consumer.setMessageModel(MessageModel.BROADCASTING);
        //设置消息的监听器
        consumer.setMessageListener(new MessageListenerConcurrently() 
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) 
                for(MessageExt msg:list)
                    String s = new String(msg.getBody(), Charset.defaultCharset());
                    System.out.println("线程:"+Thread.currentThread()+",消息的内容:"+s);
                
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            
        );
        //启动消费者
        consumer.start();
    

运行结果:

生产者发送了十条消息之后,消费者集群的每个服务器均收到十条数据


顺序消费

实现生产顺序:12345消费顺序12345
哪些消息要实现顺序消费,就要让那些消息进入到同一个队列当中,对于消费者来说,一个队列对于一个线程

假设我们没有实现顺序消费的时候

创建生产者

1.创建实体类

@Setter
@Getter
public class OrderStep 
    private long orderId;
    private String desc;

    @Override
    public String toString() 
        return "OrderStep" +
                "orderId=" + orderId +
                ", desc='" + desc + '\\'' +
                '';
    

2.创建测试类

public class OrderUtil 
    public static List<OrderStep> buildOrders()
        List<OrderStep> orderList = new ArrayList<OrderStep>();

        OrderStep orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("创建");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111065L);
        orderDemo.setDesc("创建");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103117235L);
        orderDemo.setDesc("创建");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111065L);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103117235L);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111065L);
        orderDemo.setDesc("完成");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("推送");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103117235L);
        orderDemo.setDesc("完成");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("完成");
        orderList.add(orderDemo);
        return orderList;
    

3.创建生产者类

public class Producer 
        public static void main(String[] args) throws Exception 
            DefaultMQProducer producer = new DefaultMQProducer("orderlyProducerGroup");
            producer.setNamesrvAddr("43.143.161.59:9876");
            producer.start();
            String topic = "orderTopic";
            List<OrderStep> orderSteps = OrderUtil.buildOrders();
            for(OrderStep step:orderSteps)
                Message msg = new Message(topic,step.toString().getBytes(Charset.defaultCharset()));
                producer.sendOneway(msg);
            
            producer.shutdown();
        

创建消费者类

public class Consumer 
    public static void main(String[] args) throws Exception 
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("orderlyConsumerGroup");
        consumer.setNamesrvAddr("43.143.161.59:9876");
        consumer.subscribe("orderTopic","*");
        consumer.setMessageModel(MessageModel.BROADCASTING);
        consumer.setMessageListener(new MessageListenerConcurrently() 
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) 
                for(MessageExt msg:list)
                    String s = new String(msg.getBody(), Charset.defaultCharset());
                    System.out.println("线程:"+Thread.currentThread()+",消息的内容:"+s);
                
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            
        );
        //启动消费者
        consumer.start();
    

运行结果:

可以看出和我们生产数据的顺序完全不同,整个订单的顺序都反了

如何改实现顺序消费

生产者类

public class Producer 
    public static void main(String[] args) throws Exception 
        DefaultMQProducer producer = new DefaultMQProducer("orderlyProducerGroup");
        producer.setNamesrvAddr("43.143.161.59:9876");
        producer.start();
        String topic = "orderTopic";
        List<OrderStep> orderSteps = OrderUtil.buildOrders();
        //设置队列选择器
        MessageQueueSelector selector = new MessageQueueSelector() 
            @Override
            public MessageQueue select(List<MessageQueue> list, Message message, Object o) 
                System.out.println("队列个数"+list.size());
                Long orderId = (Long) o;
                int index = (int)(orderId % list.size());
                return list.get(index);
            
        ;
        for(OrderStep step:orderSteps)
            Message msg = new Message(topic,step.toString().getBytes(Charset.defaultCharset()));
            //指定消息选择器,换入的参数
            producer.send(msg,selector,step.getOrderId());
        
        producer.shutdown();
    

消费者类

public class Consumer 
    public static void main(String[] args) throws Exception 
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("orderlyConsumerGroup");
        consumer.setNamesrvAddr("43.143.161.59:9876");
        consumer.subscribe("orderTopic","*");
        //从什么地方开始消费
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        //一个队列对应一个线程
        consumer.setMessageListener(new MessageListenerOrderly() 
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) 
                for(MessageExt msg:list)
                    System.out.println("当前线程:"+Thread.currentThread()+":,队列ID"+msg.getQueueId()+",消息内容:"+new String(msg.getBody(),Charset.defaultCharset()));
                
                return ConsumeOrderlyStatus.SUCCESS;
            
        );
        //启动消费者
        consumer.start();
    

rocketmq之顺序消费(代码片段)

消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。顺序消费的原理解析,在默认的情况下消息发送会采取RoundRobin轮询方式把消息发送到不同的queue(... 查看详情

rocketmq之顺序消费(代码片段)

消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。顺序消费的原理解析,在默认的情况下消息发送会采取RoundRobin轮询方式把消息发送到不同的queue(... 查看详情

rocketmq顺序消费(代码片段)

部分内容出处  https://www.jianshu.com/p/453c6e7ff81crocketmq内部有4个默认的队里,在发送消息时,同一组的消息需要按照顺序,发送到相应的mq中,同一组消息按照顺序进行消费,不同组的消息可以并行的进行消费。下面看一下pr... 查看详情

rocketmq笔记:顺序消息(代码片段)

...是可以按照消息的发送顺序来消费(FIFO)。  顺序消息是RocketMQ提供的一种消息类型,支持消费者按照发送消息的先后顺序获取消息。顺序消息在发送、存储和投递的处理过程中,强调多条消息间的先后顺序关系。RocketMQ顺序消... 查看详情

rocketmq源码解析-消息消费(代码片段)

RocketMQ源码解析-消息消费1.消费者相关类2.消费者的启动3.消息的拉取4.消费者的负载均衡5.消息的消费6.消费进度管理看了很多遍的代码,还是决定动手把记录下来,梳理一下整体结构和实现细节,给大家一个参考,写的不好的地... 查看详情

rocketmq学习笔记:消息发送模式(代码片段)

这是本人学习的总结,主要学习资料如下马士兵教育目录1、消息发送模式2、消息消费模式3、顺序消息的消费和发送3.1、全局顺序3.2、部分顺序3.3、部分顺序代码样例3.3.1、依赖3.3.2、发送信息3.3.3、接受信息4、延时消息的消... 查看详情

rocketmq(二十)消息消费重试机制(代码片段)

1、顺序消息的消费重试对于顺序消息,当Consumer消费消息失败后,为了保证消息的顺序性,其会自动不断地进行消息重试,直到消费成功。消费重试默认间隔时间为1000毫秒。重试期间应用会出现消息消费被阻塞的... 查看详情

rocketmq(11)——消费者拉模式和推模式(代码片段)

消费者拉模式和推模式RocketMQ提供了两种消息的消费模式,拉模式和推模式。我们先来看一下拉模式消费的应用。拉模式消费拉模式消费使用的是DefaultMQPullConsumer,核心逻辑是先拿到需要获取消息的Topic对应的队列,然后依次从... 查看详情

rocketmq源码(22)—consumemessageorderlyservice顺序消费消息源码(代码片段)

基于RocketMQrelease-4.9.3,深入的介绍了ConsumeMessageOrderlyService顺序消费消息源码。此前我们学习了consumer消息的拉取流程源码:RocketMQ源码(18)—DefaultMQPushConsumer消费者发起拉取消息请求源码RocketMQ源码(19)—Broker处理DefaultMQPushC... 查看详情

rocketmq(11)——消费者拉模式和推模式(代码片段)

消费者拉模式和推模式RocketMQ提供了两种消息的消费模式,拉模式和推模式。我们先来看一下拉模式消费的应用。拉模式消费拉模式消费使用的是DefaultMQPullConsumer,核心逻辑是先拿到需要获取消息的Topic对应的队列,... 查看详情

rocketmq——ordermessage(顺序消息)(代码片段)

生产者端消费者端运行效果补充RocketMQ提供了3种模式的Producer:NormalProducer(普通)、OrderProducer(顺序)、TransactionProducer(事务),对应的分别是普通消息、顺序消息和事务消息。在前面的博客当中,涉及的都是NormalProducer,调... 查看详情

rocketmq如何保证消费顺序

参考技术A集群模式下锁队列保证消息被同一个consumer消费,往broker定时发送锁命令本地消费时不论集群模式和广播模式都会有本地队列锁进行锁定保证同一个队列只会同时被一个消费者线程锁定本地消费时也按队列进行锁定操作... 查看详情

一次rocketmq顺序消费延迟的问题定位(代码片段)

一次RocketMQ顺序消费延迟的问题定位问题背景与现象昨晚收到了应用报警,发现线上某个业务消费消息延迟了54s多(从消息发送到MQ到被消费的间隔):2021-06-30T23:12:46.756messageprocessingisincrediblydelayed!(Currentdelaytime:547... 查看详情

rocketmq之顺序消费(代码片段)

消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。顺序消费的原理解析,在默认的情况下消息发送会采取RoundRobin轮询方式把消息发送到不同的queue(... 查看详情

rocketmq(十四)顺序消息(代码片段)

1、什么是顺序消息顺序消息指的是,严格按照消息的发送顺序进行消费的消息(FIFO)。默认情况下生产者会把消息以RoundRobin轮询方式发送到不同的Queue分区队列;而消费消息时会从多个Queue上拉取消息,这种情况下的发... 查看详情

rocketmq专题2:三种常用生产消费方式(顺序广播定时)以及顺序消费源码探究(代码片段)

...行常用的三种消息类型例子展示的时候,我们先来说一说RocketMQ的几个重要概念:PullConsumer与PushConsumer:主要区别在于Pull与Push的区别。对于PullConsumer,消费者会主动从broker中拉取消息进行消费。而对于PushConsumer,会封装包含消... 查看详情

rocketmq:消息ack机制源码解析(代码片段)

消息消费进度概述消费者消费消息过程中,为了避免消息的重复消费,应将消息消费进度保存起来,当其他消费者再对消息进行消费时,读取已消费的消息偏移量,对之后的消息进行消费即可。消息模式分为... 查看详情

rocketmq使用广播消息(代码片段)

目录说明生产端消费端总结说明RocketMQ消息模式主要有两种:(1)、MessageModel.CLUSTERING:集群模式。同一消费者组内的每个消费者,只消费到Topic的一部分消息,所有消费者消费的消息加起来就是Topic的所有... 查看详情