mq4万字保姆教程|rabbitmq知识点整理与springboot整合附demo(图文并茂)(代码片段)

BugGuys BugGuys     2023-01-08     557

关键词:

文章参考

导读

【4万字保姆级教程】本文详细的从应用层面上讲解了RabbitMQ的使用以及整合Springboot;对于其概念进行讲解,提供了可以完成日常开发的接口与demo;

工作队列

1. work Queues

工作队列(又称任务队列)的主要目的是避免立即执行资源密集型任务,且等待执行完成。我们可以将任务放入队列中,后台运行的工作进程将任务取出并执行,当有多个工作线程时,这些线程将一起处理任务。

轮询分发消息

一个生产者发送到一个队列中,且由多个工作线程去处理。一个消息只能被处理一次,多个工作线程是竞争的关系。

一个生产者: 发送10条消息

public class Task 
    public static final String Queue_Name="hello";

    public static void main(String[] args) throws Exception 
        Channel channel = RabbitMqUtils.getChannel();
        // 生成一个队列
        /*
        String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
        参数1:队列名
        参数2:队列消息是否持久化,默认false存储在内存中
        参数3:是否进行消息的共享,true可多个共同消费,false是只能单一消费者消费
        参数4:是否自动删除最后一个消费者断开连接后,该队列是否自动删除,true自动删除
        参数5:其他参数
         */
        channel.queueDeclare(Queue_Name, false, false, false, null);
        // 发消息
        String message = "hello world";
        /*
        参数1:交换机名称,默认""
        参数2:路由的Key值是哪个  本次是队列名称
        参数3:其他参数
        参数4:消息体
         */
        for (int i = 0; i < 10; i++) 
            channel.basicPublish("", Queue_Name, null, (message+"_"+i).getBytes());
        
        System.out.println("消息发送完毕");
    


启动3个消费者

public class Worker 
    public static final String Queue_Name = "hello";

    public static void main(String[] args) throws Exception 
        Channel channel = RabbitMqUtils.getChannel();
         /*
        参数1:消费哪个队列
        参数2:消费成功后是否要自动应答 true自动应答,false手动应答
        参数3:消息成功消费的回调
        参数4:消息消费异常的回调
         */
        channel.basicConsume(Queue_Name, true, (consumerTag, message)-> 
            System.out.println(args[0]+"tag:"+consumerTag+" message:"+new String(message.getBody()));
        ,(consumerTag)->
            System.out.println(args[0]+"消息被消费中断:"+consumerTag);
        );
    

执行结果(轮询非有序)

工作线程2:----->tag:amq.ctag-h_N1sFd1t5stShV0o0R95A message:hello world_0
工作线程1:----->tag:amq.ctag-m9qZB0mWDxN2G0-ZOGjqWA message:hello world_2
工作线程0:----->tag:amq.ctag-El8AyGbfTrdvlQ7C4cw3NQ message:hello world_1
工作线程1:----->tag:amq.ctag-m9qZB0mWDxN2G0-ZOGjqWA message:hello world_5
工作线程2:----->tag:amq.ctag-h_N1sFd1t5stShV0o0R95A message:hello world_3
工作线程1:----->tag:amq.ctag-m9qZB0mWDxN2G0-ZOGjqWA message:hello world_8
工作线程0:----->tag:amq.ctag-El8AyGbfTrdvlQ7C4cw3NQ message:hello world_4
工作线程2:----->tag:amq.ctag-h_N1sFd1t5stShV0o0R95A message:hello world_6
工作线程0:----->tag:amq.ctag-El8AyGbfTrdvlQ7C4cw3NQ message:hello world_7
工作线程2:----->tag:amq.ctag-h_N1sFd1t5stShV0o0R95A message:hello world_9

2. 消息应答

概念

若工作线程突发异常中断,那么我们可能将丢失正在处理的消息。因此mq引入了一种消息应答机制,保证消费者在处理消息后,告诉MQ已经处理,可以将消息删除。

自动应答

消息发送后立即被认为已经发送成功。

需要在高吞吐量和数据传输安全性方面做权衡,仅适用在消费者可以高效以某种速率处理消息的情况下使用;

该模式可能因为消费者channel关闭造成消息丢失以及未对发送消息数量进行限制导致消息发送过载等风险;

手动应答

推荐手动应答

Channel.basicAck(long tag, boolean multiple) // 肯定确认 mq确定消息成功处理,可以删除
Channel.basicNack(long tag, boolean multiple, boolean requeue) // 不确定 mq不确定消息是否处理
Channel.basicReject(long tag, boolean requeue) // 不处理消息,直接拒绝,直接丢弃,不能批量处理

tag:消息标识;message.getEnvelope().getDeliveryTag();

multiple的true和false,见下面的代码

​ true:批量应答Channel上未应答的消息,当channel上传tag送的消息是1,2,3,4,当tag=4被确认时,1-3也会被确认收到消息应答;

​ false:只会应答当前消息,其余消息不会被应答;

requeue:true,拒绝的消息重新入消费队列;

消息自动重新入队

如果消费者异常断开,导致消息未发送ACK确认,MQ将其消息重新排队,很快发送给另一个消费者,保证消息的不丢失;

// 手动确认代码
 channel.basicConsume(Queue_Name, false, (consumerTag, message)-> 
                System.out.println(args[0]+"tag:"+consumerTag+" message:"+new String(message.getBody()));
                /*
                应答信道消息
                参数2:是否批量确认
                 */
                channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
            ,(consumerTag)->
                System.out.println(args[0]+"消息被消费中断:"+consumerTag);
            );

3. RabbitMQ持久化

默认情况下,RabbitMQ因异常导致未成功消费的消息丢弃,若需要持久化需要将队列和消息都标记为持久化;

设置队列持久化
  // 生成一个队列
        /*
        String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
        参数1:队列名
        参数2:队列消息是否持久化,默认false存储在内存中
        参数3:是否进行消息的共享,true可多个共同消费,false是只能单一消费者消费
        参数4:是否自动删除最后一个消费者断开连接后,该队列是否自动删除,true自动删除
        参数5:其他参数

         */
        channel.queueDeclare(Queue_Name, true, false, false, null);

注意:若原先声明的队列不是持久化的,启动会报错,需要将原先队列删除后重新创建持久化队列。

设置消息持久化
// 第三个参数:MessageProperties.PERSISTENT_TEXT_PLAIN
channel.basicPublish("", Queue_Name, MessageProperties.PERSISTENT_TEXT_PLAIN, (message+"_"+i).getBytes());

将消息持久化并不能完全保证消息不丢失,因为消息当准备存储在磁盘的时候还未完全写入,消息还在缓存时的一个间隔点,此时并没有真正写入磁盘,持久性无法完全保证,需要参考后续的确认。

不公平分发

RabbitMQ默认情况是使用轮询消费消息,但如果多个消费者的处理速度不一致,就会导致慢的消费者影响到了快的消费者执行,因此需要有一种能者多劳的分发方式。

设置参数:默认是0,表示轮询分发;作用于手动确认的消费者;

设置prefetchCount = 3。这样RabbitMQ就会使得每个Consumer在同一个时间点最多处理3个Message。换句话说,在接收到该Consumer的ack前,他它不会将新的Message分发给它;

int prefetchCount = 1;// 预取值,会告诉RabbitMQ不要同时给一个消费者推送多于N个消息,即一旦有N个消息还没有ack,则该consumer将block掉,直到有消息ack
channel.basicQos(prefetchCount);

代码:默认发送10条消息,用两个(一快一慢)消费者消费;

 	private static void createFasterConsumer() throws Exception 
        Channel channel = RabbitMqUtils.getChannel();
        // 默认0是轮询
        channel.basicQos(1);// 预取值为1,表示预取值1个消息
        // 手动应答1s
        channel.basicConsume(Queue_Name, false, (consumerTag, message)-> 
            System.out.println("快的tag:"+consumerTag+" message:"+new String(message.getBody()));
            ThreadUtils.sleep(1);
                /*
                应答信道消息
                 */
            channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
        ,(consumerTag)->
            System.out.println("快的消息被消费中断:"+consumerTag);
        );
    


    private static void createSlowerConsumer() throws Exception 
        Channel channel = RabbitMqUtils.getChannel();
        // 默认0是轮询
        channel.basicQos(1);// 预取值为1,表示预取值1个消息
        // 手动应答10s
        channel.basicConsume(Queue_Name, false, (consumerTag, message)-> 
            System.out.println("慢的tag:"+consumerTag+" message:"+new String(message.getBody()));
            ThreadUtils.sleep(10);
                /*
                应答信道消息
                 */
            channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
        ,(consumerTag)->
            System.out.println("慢的消息被消费中断:"+consumerTag);
        );
    

执行结果:证明快的消费者消费数量多

快的tag:amq.ctag-4PELKAzpPIID4-vWT2BJkw message:hello world_0
慢的tag:amq.ctag-pvROpH3ChDTW8gl0qnbU1A message:hello world_1
快的tag:amq.ctag-4PELKAzpPIID4-vWT2BJkw message:hello world_2
快的tag:amq.ctag-4PELKAzpPIID4-vWT2BJkw message:hello world_3
快的tag:amq.ctag-4PELKAzpPIID4-vWT2BJkw message:hello world_4
快的tag:amq.ctag-4PELKAzpPIID4-vWT2BJkw message:hello world_5
快的tag:amq.ctag-4PELKAzpPIID4-vWT2BJkw message:hello world_6
快的tag:amq.ctag-4PELKAzpPIID4-vWT2BJkw message:hello world_7
快的tag:amq.ctag-4PELKAzpPIID4-vWT2BJkw message:hello world_8
快的tag:amq.ctag-4PELKAzpPIID4-vWT2BJkw message:hello world_9
预取值

由于消息本身发送是异步的,消息的接收处理也是异步的。因此消费者就有一个未确认的消息缓冲区。避免消息堆积在缓冲区中,无法消费。我们可以通过channel.basicQos(N)来设置未确认的消息缓冲区的大小。该值定义通道上允许的未确认消息的最大数量。一旦数量达到了配置的数量,MQ将停止继续向该channel发送消息,除非收到了消息确认。通常,增加预取值将提高向消费者传递消息的速度,虽然自动应答传输消息的速率是最佳的,但是,该情况下已传递但尚未处理的消息数量也会增加,从而增加了消费则的RAM消耗(随机存储器)。也就是消费者内存的消耗。

举例,同样上面的场景代码,默认发送10条消息,用两个(一快一慢)消费者消费;快的basicQos(1),慢的basicQos(6),可以观察到,慢的缓冲区确实有6条,快的消费了4条;

快的tag:amq.ctag-MGJ91J8ivK-fmk0E4e0mvw message:hello world_0
慢的tag:amq.ctag-pWo9dDAvlWd-osXuY8rXQg message:hello world_1
快的tag:amq.ctag-MGJ91J8ivK-fmk0E4e0mvw message:hello world_7
快的tag:amq.ctag-MGJ91J8ivK-fmk0E4e0mvw message:hello world_8
快的tag:amq.ctag-MGJ91J8ivK-fmk0E4e0mvw message:hello world_9
慢的tag:amq.ctag-pWo9dDAvlWd-osXuY8rXQg message:hello world_2
慢的tag:amq.ctag-pWo9dDAvlWd-osXuY8rXQg message:hello world_3
慢的tag:amq.ctag-pWo9dDAvlWd-osXuY8rXQg message:hello world_4
慢的tag:amq.ctag-pWo9dDAvlWd-osXuY8rXQg message:hello world_5
慢的tag:amq.ctag-pWo9dDAvlWd-osXuY8rXQg message:hello world_6

4. 发布确认

生产者将信道设置成confirm模式,在该模式下,信道上的所有消息都会有一个唯一的UID(从1递增)。当消息投递到所有匹配的队列时,broker 就会发送包含UID的确认消息给生产者,让生产者知道消息正确到达目的队列了;

channel.confirmSelect();

若消息和队列是持久化的,那么确认消息会在写入磁盘后发出,回传确认消息的delivery-tag域包含UID;

broker 也可以设置basic.ack的multiple域,表示到这个序列号之前的所有消息都已经得到了处理;

优势:

  1. confirm模式是异步的,一旦发布一条消息,生产者可以等待ack的同时发送下一条消息,当这条消息最终收到确认后,生产者可以通过回调方法来确认该消息;
  2. 若因为MQ的问题导致消息丢失,就会发送一条nack消息,生产者同样可以通过回调处理nack消息;
单个确认发布

是一种同步确认发布的方式,当一个消息发布后收到确认了才会继续发布后续消息。waitForConfirmsOrDie(long)这个方法只有在消息被确认的时候才返回,如果在指定时间内没有返回则会抛异常。

缺点:发布速度慢,当一条消息没有确认发布,就会阻塞后面的消息发布;

实验:使用单个确认发布,发布1000条消息 最终耗时 3350ms;

public static void publishMessageBySync() throws Exception 
        Channel channel = RabbitMqUtils.getChannel();
        // 开启发布确认
        channel.confirmSelect();
        // 生成一个队列
        /*
        String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
        参数1:队列名
        参数2:队列消息是否持久化,默认false存储在内存中
        参数3:是否进行消息的共享,true可多个共同消费,false是只能单一消费者消费
        参数4:是否自动删除最后一个消费者断开连接后,该队列是否自动删除,true自动删除
        参数5:其他参数
         */
        channel.queueDeclare(Queue_Name, false, false, false, null);
        // 发消息
        String message = "hello world";
        long start = System.currentTimeMillis();
        for (int i = 0; i < 1000; i++) 
            /*
            参数1:交换机名称,默认""
            参数2:路由的Key值是哪个  本次是队列名称
            参数3:其他参数
            参数4:消息体
             */
            channel.basicPublish("", Queue_Name, MessageProperties.PERSISTENT_TEXT_PLAIN, (message+"_"+i).getBytes());
            boolean b = channel.waitForConfirms();
            if (b)
                System.out.println("发送成功");
             else 
                System.out.println("-----发送失败------");
            
        
        long end = System.currentTimeMillis();
        System.out.println("消息发送完毕 "+ (end - start));
    

批量确认发布

与单个确认发布相比,批量确认发布可以极大的提高吞吐量;

缺点:当发生故障导致确认发布出现问题时,无法确定是哪一条消息出现了故障;

​ 该方案仍是同步的,依旧会出现阻塞发布消息;

实验:生产者发送1000个消息,每100次确认一回,总共耗时62ms

public static void publishMessageByBatch() throws Exception 
        Channel channel = RabbitMqUtils.getChannel();
        // 开启发布确认
        channel.confirmSelect();
        // 生成一个队列
        /*
        String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
        参数1:队列名
        参数2:队列消息是否持久化,默认false存储在内存中
        参数3:是否进行消息的共享,true可多个共同消费,false是只能单一消费者消费
        参数4:是否自动删除最后一个消费者断开连接后,该队列是否自动删除,true自动删除
        参数5:其他参数
         */
        channel.queueDeclare(Queue_Name, false, false, false, null);
        // 发消息
        String message = "hello world";
        long start = System.currentTimeMillis();
        for (int i = 0; i < 1000; i++) 
            /*
            参数1:交换机名称,默认""
            参数2:路由的Key值是哪个  本次是队列名称
            参数3:其他参数
            参数4:消息体
             */
            channel.basicPublish("", Queue_Name, MessageProperties.PERSISTENT_TEXT_PLAIN, (message+"_"+i).getBytes());
            if (i % 100 == 0) 
                boolean b = channel.waitForConfirms();
                if (b)
                    System.out.println("发送成功");
                 else 
                    System.out.println("-----发送失败------");
                
            
        
        long end = System.currentTimeMillis();
        System.out.println("消息发送完毕 "+ (end - start));
    

异步确认发布

利用回调函数来达到消息的可靠传递。通过中间件也是通过函数回调来保证是否投递成功;他将消息放入一个容器中,每个消息都有一个UID,每次异步确认发布,可以保证消息是否成功确认发布;

通常,我们需要通过使用并发容器ConcurrentSkipListMap存储所有发送的消息,然后每次确认后将容器中的对应消息删除,剩下的消息就是未确认的消息;

实验:生产者发送1000条消息,异步监听确认发布,耗时36ms。通过日志发现,每次发送deliveryTag都从1开始,并且multiple同时存在true和false;

 public static void publishMessageByAsync() throws Exception 
        Channel channel = RabbitMqUtils.getChannel();
        // 开启发布确认
        channel.confirmSelect();
        // 生成一个队列
        /*
        String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
        参数1:队列名
        参数2:队列消息是否持久化,默认false存储在内存中
        参数3:是否进行消息的共享,true可多个共同消费,false是只能单一消费者消费
        参数4:是否自动删除最后一个消费者断开连接后,该队列是否自动删除,true自动删除
        参数5:其他参数
         */
        channel.queueDeclare(Queue_Name, false, false, false, null);
        // 发消息
        String message = "hello world";

        // 高并发下的hashMap
        ConcurrentSkipListMap<Long, String> confirmMap = new ConcurrentSkipListMap<>();
        
     	// deliveryTag 消息编号,multiple是否批量确认
        // 消息确认成功回调
        ConfirmCallback ackCallback = (deliveryTag, multiple) -> 
            System.out.println(String.format("确认的消息 deliveryTag:%d multiple:%s", deliveryTag, multiple));
            // 如果是批量删除需要使用headMap
            if (multiple) 
                ConcurrentNavigableMap<Long, 查看详情  

五万字|hive知识体系保姆级教程(代码片段)

文档目录如下:Hive涉及的知识点如下图所示,本文将逐一讲解:本文较长,获取本文完整PDF文档,请扫码关注公众号【五分钟学大数据】,后台发送:hivepdf,即可下载带目录的完整版Hive文档:... 查看详情

javascript保姆级教程———重难点详细解析(万字长文,建议收藏)(代码片段)

...是整理了JS中的一些重点,难点,以及不好理解的知识点本文非常详细,深入的讲解,包学包会1.JS函数1.1函数(Function)是什么?函数(方法)是由事件驱动的或者当它被调用时执行的可重复使用的代码块... 查看详情

玩转rabbitmq系列02:rabbitmq保姆级安装教程与基本消息模型实战(代码片段)

...向:java后端开发🎁我的上一篇文章:【玩转Rabbitmq系列】01:一文带你敲响Rabbitmq的大门💕如果我的文章对你有帮助,点赞、收藏、留言都是对我最大的动力【玩转Rabbitmq系列】文章直通车~【玩转Rabbitmq系列】01... 查看详情

13万字c语言从入门到精通保姆级教程2021年版(建议收藏)(代码片段)

友情提示:先关注收藏,再查看,13万字保姆级C语言从入门到精通教程。C语言保姆级配套代码与视频教程链接文章目录计算机常识什么是计算机程序?什么是计算机语言?常见的计算机语言类型有哪些?什么是C语言?C语... 查看详情

[保姆级万字教程]打造最迷人的s曲线----带你从零手撕基于huffman编码的文件压缩项目(代码片段)

基于Huffman编码的文件压缩1.文件压缩1.1什么是文件压缩?1.2为什么需要压缩1.3压缩的分类1.4压缩方法2.huffman编码的文件压缩2.1构建Huffman树2.2获取Huffman编码2.3压缩2.3.1获取源文件中每个字节出现的频次信息2.3.2根据获取到的频... 查看详情

[保姆级万字教程]打造最迷人的s曲线----带你从零手撕基于huffman编码的文件压缩项目(代码片段)

基于Huffman编码的文件压缩1.文件压缩1.1什么是文件压缩?1.2为什么需要压缩1.3压缩的分类1.4压缩方法2.huffman编码的文件压缩2.1构建Huffman树2.2获取Huffman编码2.3压缩2.3.1获取源文件中每个字节出现的频次信息2.3.2根据获取到的频... 查看详情

万字保姆级pandas核心知识操作大全

👆点击关注|设为星标|干货速递👆分享最近常用到pandas做数据处理和分析,特意总结了以下常用内容。pandas常用速查引入依赖# 导入模块import pymysqlimport pandas as pdimport numpy as npimport time# 数据库from sq... 查看详情

五万字|hive知识体系保姆级教程(代码片段)

文档目录如下:Hive涉及的知识点如下图所示,本文将逐一讲解:本文较长,获取本文完整PDF文档,请扫码关注公众号【五分钟学大数据】,后台发送:hivepdf,即可下载带目录的完整版Hive文档:... 查看详情

❤️一看就懂!保姆级实例详解stllist容器万字整理❤️(代码片段)

🎈作者:Linux猿🎈简介:CSDN博客专家🏆,C/C++、面试、刷题、算法尽管咨询我,关注我,有问题私聊!🎈关注专栏:C/C++面试通关集锦 (优质好文持续更新中……)... 查看详情

❤️万字python基础保姆式教学❤️,小白快速入门python!(代码片段)

前言又是爆肝干文的日子,继上次说要出一期Python新手入门教程系列文章后,就在不停地整理和码字,终于是把【基础入门】这一块给写出来了。高级编程的【正则表达式】和【面向对象编程】内容我在之前已经出过... 查看详情

❤️万字python基础保姆式教学❤️,小白快速入门python!(代码片段)

前言又是爆肝干文的日子,继上次说要出一期Python新手入门教程系列文章后,就在不停地整理和码字,终于是把【基础入门】这一块给写出来了。高级编程的【正则表达式】和【面向对象编程】内容我在之前已经出过... 查看详情

4万字typescript保姆级入门教程(2021版)(建议收藏)(代码片段)

极客江南:一个对开发技术特别执着的程序员,对移动开发有着独到的见解和深入的研究,有着多年的iOS、Android、HTML5开发经验,对NativeApp、HybridApp、WebApp开发有着独到的见解和深入的研究,除此之外还精通JavaScrip... 查看详情

教程万字长文保姆级教你制作自己的多功能qq机器人(代码片段)

...dn.net/)若发现存在部分图片缺失,可以访问原文:万字长文保姆级教你制作自己的多功能QQ机器人-小锋学长生活大爆炸​​​​​​目录前言功能清单免费领取轻量应用云服务器SSH连接服务器常见Ubuntu软件安装与问题修复... 查看详情

肝了10万字,go语言保姆级编程教程2021最新版(建议收藏)(代码片段)

友情提示:编辑器显示12万多字,先点赞,关注,收藏,一键三连支持,再学习。本文对比C语言进行学习Go语言,如果你有C语言基础,学习Go语言会容易很多。Go语言保姆级教程目录什么是Go语言Go语... 查看详情

五万字|spark吐血整理,学习与面试收藏这篇就够了!(代码片段)

Spark超全总结文档目录如下:Spark涉及的知识点如下图所示,本文将逐一讲解:本文档参考了关于Spark的众多资料整理而成,为了整洁的排版及舒适的阅读,对于模糊不清晰的图片及黑白图片进行重新绘制成了... 查看详情

js万字整理javascript相关基础技术面试题总结-前端面试必备-基础知识总结-秋招冲鸭(代码片段)

文章目录1.变量声明与类型1.1varletconst区别1.2数据类型1.3值类型与引用类型的区别1.4typeof能判断哪些类型1.5判断数据类型的方式1.6`===`与`==`1.7truly变量与falsely变量1.8强制类型转换和隐式类型转换1.9语句... 查看详情

如何通过docker部署rabbitmq?(保姆级教程)(代码片段)

...载速度慢,也可忽略。)二、部署步骤在线拉取RabbitMQ镜像。dockerpullrabbitmq:3-management创建容器并启动容器。dockerrun\\-eRABBITMQ_DEFAULT_USER=admin\\-eRABBITMQ_DEFAULT_PASS=123321\\--namemq\\--hostnamemq1\\-p15672:15672\\-p5672:5672\\-d\\rabbitmq:3... 查看详情