rabbitmq学习笔记五:rabbitmq之优先级消息队列

wuzhiyuan wuzhiyuan     2022-08-31     264

关键词:

RabbitMQ优先级队列注意点:

1、只有当消费者不足,不能及时进行消费的情况下,优先级队列才会生效

2、RabbitMQ3.5以后才支持优先级队列

代码在博客:RabbitMQ学习笔记三:Java实现RabbitMQ之与Spring集成 最后面有下载地址,只是做了少许改变,改变的代码如下:

消费者 spring-config.xml(还需要增加一个QueueListener监听器,代码就不复制到这里了,可以参考项目中的其他监听器)

<!-- ========================================RabbitMQ========================================= -->
    <!-- 连接工厂 -->
    <rabbit:connection-factory id="connectionFactory" host="localhost" publisher-confirms="true" virtual-host="/" username="guest" password="guest" />
    <!-- 监听器 -->
    <rabbit:listener-container connection-factory="connectionFactory">
        <!-- queues是队列名称,可填多个,用逗号隔开, method是ref指定的Bean调用Invoke方法执行的方法名称 -->
        <rabbit:listener queues="red" method="onMessage" ref="redQueueListener" />
        <rabbit:listener queues="blue" method="onMessage" ref="blueQueueListener" />
        <rabbit:listener queues="queue" method="queueList" ref="queueListener" />
    </rabbit:listener-container>
    <!-- 队列声明 -->
    <rabbit:queue name="red" durable="true" />
    <rabbit:queue name="blue" durable="true" />
    <rabbit:queue name="queue" durable="true" />
    <!-- 红色监听处理器 -->
    <bean id="redQueueListener" class="com.aitongyi.customer.RedQueueListener" />
    <!-- 颜色监听处理器 -->
    <bean id="blueQueueListener" class="com.aitongyi.customer.BlueQueueListener" />
    <!-- 优先级队列监听处理器 -->
    <bean id="queueListener" class="com.aitongyi.customer.QueueListener" />

生产者增加一个主方法:

public static void main(String[] args)
    {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setAddresses("127.0.0.1:5672");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setVirtualHost("/");
        connectionFactory.setPublisherConfirms(true); // 必须要设置

        RabbitTemplate template = new RabbitTemplate(connectionFactory);

        for (final int i : priority)
        {
            template.convertAndSend("queue", (Object) ("queue" + i), new MessagePostProcessor() {

                @Override
                public Message postProcessMessage(Message arg0) throws AmqpException
                {
                    arg0.getMessageProperties().setPriority(i);
                    return arg0;
                }
            });
        }
    }

当然,还需要在客户端创建一个优先级队列:

注意,x-max-length = 9999,这个值最后不要写太大了,否则你电脑的内存会一直处于100%的使用状态(至于出现这种状况怎么解决,请点击:RabbitMQ学习笔记四:RabbitMQ命令(附疑难问题解决)),并且这个取值范围在0~255之间,超过了可能会出现问题,我测试了设置9999时,部分有问题,后面会把有问题的地方贴上来。

好了,所有的事情准备完毕了,我们先启动消费者,然后再运行主方法,此时的队列优先级设置为,private static final int[] priority = { 1, 5, 1, 2, 3, 4, 5, 5, 0, 3, 6, 10, 4, 100, 99, 98 };执行结果如下:

2017-05-16 09:51:48 399 [INFO] c.a.c.QueueListener - queueList Receved:queue1
2017-05-16 09:51:48 417 [INFO] c.a.c.QueueListener - queueList Receved:queue5
2017-05-16 09:51:48 435 [INFO] c.a.c.QueueListener - queueList Receved:queue1
2017-05-16 09:51:48 493 [INFO] c.a.c.QueueListener - queueList Receved:queue2
2017-05-16 09:51:48 514 [INFO] c.a.c.QueueListener - queueList Receved:queue3
2017-05-16 09:51:48 596 [INFO] c.a.c.QueueListener - queueList Receved:queue4
2017-05-16 09:51:48 950 [INFO] c.a.c.QueueListener - queueList Receved:queue5
2017-05-16 09:51:48 975 [INFO] c.a.c.QueueListener - queueList Receved:queue5
2017-05-16 09:51:49 015 [INFO] c.a.c.QueueListener - queueList Receved:queue0
2017-05-16 09:51:49 039 [INFO] c.a.c.QueueListener - queueList Receved:queue3
2017-05-16 09:51:49 058 [INFO] c.a.c.QueueListener - queueList Receved:queue6
2017-05-16 09:51:49 084 [INFO] c.a.c.QueueListener - queueList Receved:queue10
2017-05-16 09:51:49 102 [INFO] c.a.c.QueueListener - queueList Receved:queue4
2017-05-16 09:51:49 140 [INFO] c.a.c.QueueListener - queueList Receved:queue100
2017-05-16 09:51:49 561 [INFO] c.a.c.QueueListener - queueList Receved:queue99
2017-05-16 09:51:49 595 [INFO] c.a.c.QueueListener - queueList Receved:queue98

很奇怪,没有按照优先级执行?文章前面已经提到,只有当消费者不足,不能及时进行消费的情况下,优先级队列才会生效。所以,我们需要先将所有的消息队列发送至服务器,然后再启动消费者处理消息,这样,就可以看到效果了。为了达到这个目的,则需要先运行主方法,然后再启动消费者,队列优先级设置和上方一样,执行结果如下:

2017-05-16 09:56:23 296 [INFO] c.a.c.QueueListener - queueList Receved:queue100
2017-05-16 09:56:23 359 [INFO] c.a.c.QueueListener - queueList Receved:queue99
2017-05-16 09:56:23 438 [INFO] c.a.c.QueueListener - queueList Receved:queue98
2017-05-16 09:56:23 500 [INFO] c.a.c.QueueListener - queueList Receved:queue10
2017-05-16 09:56:23 594 [INFO] c.a.c.QueueListener - queueList Receved:queue6
2017-05-16 09:56:23 656 [INFO] c.a.c.QueueListener - queueList Receved:queue5
2017-05-16 09:56:23 765 [INFO] c.a.c.QueueListener - queueList Receved:queue5
2017-05-16 09:56:23 874 [INFO] c.a.c.QueueListener - queueList Receved:queue5
2017-05-16 09:56:24 077 [INFO] c.a.c.QueueListener - queueList Receved:queue4
2017-05-16 09:56:24 202 [INFO] c.a.c.QueueListener - queueList Receved:queue4
2017-05-16 09:56:24 262 [INFO] c.a.c.QueueListener - queueList Receved:queue3
2017-05-16 09:56:24 311 [INFO] c.a.c.QueueListener - queueList Receved:queue3
2017-05-16 09:56:24 389 [INFO] c.a.c.QueueListener - queueList Receved:queue2
2017-05-16 09:56:24 422 [INFO] c.a.c.QueueListener - queueList Receved:queue1
2017-05-16 09:56:24 500 [INFO] c.a.c.QueueListener - queueList Receved:queue1
2017-05-16 09:56:24 547 [INFO] c.a.c.QueueListener - queueList Receved:queue0

这样,优先级队列就实现了,至于相同优先级的队列,执行顺序应该是随机的吧,我也没有测试,有兴趣的同学可以自己研究。

附:队列优先级设置为:private static final int[] priority = { 1, 5, 1, 2, 9998, 3, 4, 5, 999, 5, 0, 3, 6, 10, 4, 1000, 9999, 100, 99, 98, 899 };运行结果为:

2017-05-16 09:59:29 839 [INFO] c.a.c.QueueListener - queueList Receved:queue1000
2017-05-16 09:59:29 917 [INFO] c.a.c.QueueListener - queueList Receved:queue999
2017-05-16 09:59:29 995 [INFO] c.a.c.QueueListener - queueList Receved:queue899
2017-05-16 09:59:30 073 [INFO] c.a.c.QueueListener - queueList Receved:queue100
2017-05-16 09:59:30 182 [INFO] c.a.c.QueueListener - queueList Receved:queue99
2017-05-16 09:59:30 275 [INFO] c.a.c.QueueListener - queueList Receved:queue98
2017-05-16 09:59:30 353 [INFO] c.a.c.QueueListener - queueList Receved:queue9999
2017-05-16 09:59:30 431 [INFO] c.a.c.QueueListener - queueList Receved:queue9998
2017-05-16 09:59:30 509 [INFO] c.a.c.QueueListener - queueList Receved:queue10
2017-05-16 09:59:30 619 [INFO] c.a.c.QueueListener - queueList Receved:queue6
2017-05-16 09:59:30 670 [INFO] c.a.c.QueueListener - queueList Receved:queue5
2017-05-16 09:59:30 733 [INFO] c.a.c.QueueListener - queueList Receved:queue5
2017-05-16 09:59:30 826 [INFO] c.a.c.QueueListener - queueList Receved:queue5
2017-05-16 09:59:31 138 [INFO] c.a.c.QueueListener - queueList Receved:queue4
2017-05-16 09:59:31 282 [INFO] c.a.c.QueueListener - queueList Receved:queue4
2017-05-16 09:59:31 316 [INFO] c.a.c.QueueListener - queueList Receved:queue3
2017-05-16 09:59:31 379 [INFO] c.a.c.QueueListener - queueList Receved:queue3
2017-05-16 09:59:31 410 [INFO] c.a.c.QueueListener - queueList Receved:queue2
2017-05-16 09:59:31 472 [INFO] c.a.c.QueueListener - queueList Receved:queue1
2017-05-16 09:59:31 712 [INFO] c.a.c.QueueListener - queueList Receved:queue1
2017-05-16 09:59:31 745 [INFO] c.a.c.QueueListener - queueList Receved:queue0

恩,排序结果是有点乱。。。

rabbitmq学习(下)——发布确认高级幂等性优先级惰性和rabbitmq集群(代码片段)

RabbitMQ学习(下)——发布确认高级、幂等性、优先级、惰性和RabbitMQ集群一、发布确认高级1.1confirm确认模式1.2return退回模式1.3备份交换机二、幂等性、优先级、惰性2.1幂等性2.2优先级2.3惰性三、RabbitMQ集群3.1RabbitMQ集群... 查看详情

rabbitmq学习笔记(自用)(代码片段)

...概念1.1什么是MQ1.2为什么要用MQ1.3MQ的分类1.4MQ的选择二、RabbitMQ2.1RabbitMQ的概念2.2四大核心概念三、简单案例3.1WorkQueues3.2轮训分发消息3.3消息应答3.4RabbitMQ持久化3.5不公平分发3.6预取值分发四、RabbitMQ-发布确认4.1发布确认逻辑4.2发... 查看详情

分布式消息中间件之rabbitmq学习笔记[一](代码片段)

...,陆续的整理一些中间件的笔记今天和小伙伴们分享RabbitMQ相关笔记博文偏理论,内容涉及:RabbitMQ的简单介绍AMQP协议标准介绍RabbitMQDemo食用方式:了解生产者消费者模式通过本文,对RabbitMQ有大概认识户外依然大... 查看详情

分布式消息中间件之rabbitmq学习笔记[一](代码片段)

...,陆续的整理一些中间件的笔记今天和小伙伴们分享RabbitMQ相关笔记博文偏理论,内容涉及:RabbitMQ的简单介绍AMQP协议标准介绍RabbitMQDemo食用方式:了解生产者消费者模式通过本文,对RabbitMQ有大概认识户外依然大... 查看详情

springboot与rabbitmq上手之消息超时时间、队列消息超时时间(五)

参考技术A目的主要是学习RabbitMQ的消息超时时间、队列消息超时时间,大概会简单介绍学习为主:毕竟还是要来演示Springboot整合RabbitMQ注解的方式来使用。每条消息的过期时间不同,如果要删除所有过期消息,势必要扫描整个队... 查看详情

rabbitmq笔记springboot整合rabbitmq之simple容器(消费者)(代码片段)

...依赖三、配置类3.1基础配置3.2交换机、路由、队列配置3.3RabbitMq配置(Jackson2JsonMessageConverter序列化对象)四、消费者(注意要点要看)五、配置文件未完待续一、简介  消息中间件具有一系列功能如低耦合、可靠投递、广... 查看详情

消息中间件学习笔记——rabbitmq

...性降低系统复杂度提高一致性问题三、常见的MQ产品四、RabbitMQ中的概念五、如何实现RabbitMQ的延迟对列一、消息中间件概述 通过下图我们就可以很好理解消息中间件的概念。二、消息中间件(MQ)的优劣(1).MQ的优势应用... 查看详情

rabbitmq学习笔记六:话题(topics)(代码片段)

前一篇我们已经改善了我们的日志系统,不用fanout类型的转发器愚蠢的广播消息,而是使用direct类型的转发器,能够选择性的接收我们想要接收的消息。参见Rabbit学习笔记五:路由选择(Routing)http://blog.cs... 查看详情

rabbitmq学习笔记(代码片段)

视频教程【编程不良人】MQ消息中间件之RabbitMQ以及整合SpringBoot2.x实战教程1.MQ引言1.1什么是MQMQ(MessageQuene):消息队列,也叫消息中间件。通过典型的生产者和消费者模型,生产者不断向消息队列中生产消息,消费... 查看详情

rabbitmq学习笔记五:路由选择(routing)(代码片段)

前面已经学习了rabbitmq的消息的发布(publish)和订阅(subscrible),参见http://blog.csdn.net/u010416588/article/details/54667952一路由选择(Routing)前面章节我们创建了一个简单的日志系统,我们可以广播日志消息... 查看详情

rabbitmq学习笔记五:路由选择(routing)(代码片段)

前面已经学习了rabbitmq的消息的发布(publish)和订阅(subscrible),参见http://blog.csdn.net/u010416588/article/details/54667952一路由选择(Routing)前面章节我们创建了一个简单的日志系统,我们可以广播日志消息... 查看详情

rabbitmq消息队列笔记(代码片段)

...发布确认高级在生产环境中由于一些不明原因,导致RabbitMQ 重启,在RabbitMQ重启期间生产者消息投递失败,导致消息丢失,需要手动处理和恢复。如何才能进行RabbitMQ的消息可靠投递呢?特别是在比较极端 查看详情

rabbitmq消息队列笔记(代码片段)

...发布确认高级在生产环境中由于一些不明原因,导致RabbitMQ 重启,在RabbitMQ重启期间生产者消息投递失败,导致消息丢失,需要手动处理和恢复。如何才能进行RabbitMQ的消息可靠投递呢?特别是在比较极端 查看详情

rabbitmq学习笔记

一环境搭建下载地址:ERLANG  http://www.erlang.org/downloadsMQhttps://github.com/rabbitmq/rabbitmq-server/releases/tag/rabbitmq_v3_6_9首先,由于RabbitMQ使用Erlang编写的,需要运行在Erlang运行时环境上,所以在安装RabbitMQServer之前需要安 查看详情

rabbitmq学习笔记(代码片段)

目录一、引⾔二、RabbitMQ介绍三、RabbitMQ安装四、RabbitMQ架构1.官⽅的简单架构图2.RabbitMQ的完整架构图3.查看图形化界⾯并创建⼀个VirtualHost五、RabbitMQ的队列模式1.RabbitMQ的通讯⽅式2.HelloWorld模式-简单队列模式3.work队列模式:... 查看详情

rabbitmq学习笔记2基本概念

官网:http://www.rabbitmq.com 参考:http://blog.csdn.net/column/details/rabbitmq.html 1基本概念rabbitmqserver(brokerserver):rabbitmq服务client:包括producers和consumermessage:包括payload和labelexchange:produce 查看详情

rabbitmq学习笔记

rabbitMQ学习笔记2017年12月31日星期日Lee 环境:centos7版本:rabbitmq-server-3.7.2-1 准备了3台主机做实验。先配置hosts如下。cat/etc/hosts192.168.5.71 node1192.168.5.72 node72192.168.5.73 node730、基础知识建议看下美团分享的ra 查看详情

rabbitmq学习笔记

1基本概念rabbitmqserver(brokerserver):rabbitmq服务client:包括producers和consumermessage:包括payload和labelexchange:producer发布message的地方queue:messages存放和consumer收取message的地方bindings:将message从exchange到不同queue的实现con 查看详情