rabbitmq笔记(代码片段)

週进 週进     2022-12-06     331

关键词:

本笔记代码【1~6】
本笔记代码【7~8】
说明:你好,此篇文章是我在学习尚硅谷RabbitMQ时做的笔记,如果存在问题的话欢迎指出。
视频链接

1 MQ的相关概念

1.1.1 什么是MQ

MQ(message queue),从字面意思上看,本质是个队列,FIFO先入先出,只不过队列中存放的内容是message而已,还是一种跨进程的通信机制,用于上下游传递消息。在互联网架构中,MQ是一种非常常见的上下游“逻辑解耦+物理解耦”的消息通信服务。使用MQ之后,消息发送上游只需要依赖MQ,不用依赖其他服务。

1.1.2 为什么需要使用MQ

1、流量消峰
举个例子,如果订单系统最多能处理一万次订单,这个处理能力应付正常时段的下单时绰绰有余,正常时段我们下单一秒后就能返回结果。但是在高峰期,如果有两万次下单操作系统是处理不了的,只能限制订单超过一万后不允许用户下单。使用消息队列做缓冲,我们可以取消这个限制,把一秒内下的订单分散成一段时间来处理,这时有些用户可能在下单十几秒后才能收到下单成功的操作,但是比不能下单的体验更好。
2、应用解耦
以电商应用为例,应用中有订单系统,库存系统,物流系统,支付系统。用户创建订单后,如果耦合调用库存系统、物流系统、支付系统,任何一个子系统处理故障,都会造成下单操作异常。当转变成基于消息队列的方式后,系统间调用的问题会减少很多,比如物流系统因为发生故障,需要几分钟来修复。在这几分钟的时间里,物流系统要处理的内存被缓存在消息队列中,用户的下单操作可以正常完成。当物流系统恢复后,继续处理订单信息即可,中单用户感受不到物流系统的故障,提升系统的可用性。
3、异步处理
有些服务间调用是异步的, 例如A调用B,B需要花费很长时间执行,但是A需要知道B什么时候可以执行完,以前一般有两种方式,A过一段时间去调用B的查询API查询,或者A提供一个callback api,B执行完之后调用api通知A服务。这两种方式都不是很优雅,使用消息总线,可以很方便解决这个问题,A调用B服务后,只需要监听B处完成的消息,当B处理完毕后,会发送一条消息给MQ,MQ会将此消息转发给A服务。这样A服务既不用循环调用B的查询api,也不用提供callback api。同样B服务也不用这些操作。A服务还能及时的得到异步处理成功的消息。

1.2.1 RabbitMQ的概念

RabbitMQ是一个消息中间件:它接受并转发消息。你可以把它当做一个快递站点,当你要发送一个包裹时,你把你的包裹放到快递站,快递员最终会把你的快递送到收件人那里,按照这种逻辑RabbitMQ是一个快递站,一个快递员帮你传递快件。RabbitMQ与快递站的主要区别在于,它不处理快件而是接收,存储和转发消息数据。

1.2.2 RabbitMQ的四大核心


生产者

产生数据发送消息的程序是生产者

交互机

交互机是RabbiMQ非常重要的一个部件,一方面它接收来自生产者的消息,另一方面它将消息推送到队列中。交互机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推送到多个队列,或者是把消息丢弃,这个得有交换机类型决定。

队列

队列是RabbitMQ内部使用的一种数据结构,尽管消息流进RabbitMQ和应用程序,但它们只能存储在队列中。队列仅受主机的内存和磁盘限制的约束,本质上是一个大消息缓冲区。许多生产者可以将消息发送到一个队列,许多消费者可以尝试从一个队列接受数据。这就是我们使用队列的方式。

消费者

消费与接收具有相似的含义。消费者大多时候是一个等待接收消息的程序。请注意生产者,消费者和消息中间件很多时候并不在同一机器上。同一个应用程序既可以是生产者又是可以是消费者。

1.2.3 RabbitMQ核心


1、简单模式 2、工作模式 3、发布/订阅模式 4、路由模式 5、主题模式 6、发布确认模式

RabbitMQ的工作原理


Broker:接收和分发消息的应用,RabbitMQ Server就是 Message Broker
Exchange:交换机
Queue:队列
Producer:生产者
Consumer:消费者
Connection:连接,publisher/consumer和broker之间的TCP连接
Channel:通道,如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时间建立TCP Connection的开销巨大的,效率也较低。Channel是在Connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的channel进行通讯,AMQP method包含了channel id帮助客户端和message broker识别channel,所以channel之间是完全隔离的。channel作为轻量级的
Connection极大减少了操作系统建立TCO connection 的开销。
Binding:exchange和queue之间的虚拟连接,binding中可以包含 routing key,Binding信息被保存到exchange中的查询表中,用于message的分发依据。
一个交换机(Exchange)对应多个队列(Queue),一个消息实体(Broker)中也可以有多个交换机

1.2.4 安装RabbitMQ

Docker安装教程
在浏览器中访问
http://ip地址:15672/

2 HelloWorld

首先新建一个空模块

创建一个Maven模块



Maven配置

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>rabbitmq</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>


    <dependencies>
        <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.15.0</version>
        </dependency>

        <dependency>
            <groupId>commons-io</groupId>
            <artifactId>commons-io</artifactId>
            <version>2.11.0</version>
        </dependency>
        
        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter</artifactId>
            <version>RELEASE</version>
            <scope>test</scope>
        </dependency>
        
    </dependencies>
</project>

生产者

先创建一个“生产者

package com.zhoujing.rabbltmq.one;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author zhoujing
 * @version 1.0
 * @createTime 2022/6/30-21:00-星期四
 *
 * 生产者发送消息
 */
public class Producer 

    /**
     * 队列名称
     */
    public static final String QUEUE_NAME = "hello";

    /**
     * 发送消息
     * @param args
     */
    public static void main(String[] args) throws IOException, TimeoutException 
        // 创建一个连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 工厂IP连接RabbitMQ的队列
        factory.setHost("服务器IP地址");
        // 15672是web访问的端口
        factory.setPort(5672);
        // 设置超时时间
        factory.setHandshakeTimeout(60000);
        // 用户名
        factory.setUsername("admin");
        // 设置密码
        factory.setPassword("admin");
        // 建立连接
        Connection connection = factory.newConnection();
        // 获取信道
        Channel channel = connection.createChannel();

        /*
        * 生成一个队列
        * 1、队列名称
        * 2、队列里面的消息是否持久化(磁盘),默认情况消息存储到内存中,true存储到磁盘中,false存储到内存当中
        * 3、该队列是否只供一个消费者进行消费。是否进行消息共享,true只供一个,false多个消费者
        * 4、是否自动删除,最后一个消费者断开连接后,该队一句是否自动删除,true自动删除,false不自动删除
        * 5、其他
        * */
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        // 消息
        String message = "hello world!";


        /*
        * 发送一个消费
        * 1、发送到哪个交换机,先暂时不用交互机
        * 2、路由的Key值是哪个,本次是队列的名称
        * 3、其他参数信息
        * 4、发送消息的消息体
        * */
        channel.basicPublish("",QUEUE_NAME,null,message.getBytes());

        System.out.println("消息发送完毕");
    



使用服务器的同学记得在阿里云安全组中打开相应的端口

访问Web端

http://IP地址:15672


消费者

消费者用于接收消息

package com.zhoujing.rabbltmq.one;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author zhoujing
 * @version 1.0
 * @createTime 2022/7/1-16:23-星期五
 *
 * 消费者接收消息
 */
public class Consumer 

    public static void main(String[] args) throws IOException, TimeoutException 
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("IP地址");
        factory.setPort(5672);
        factory.setHandshakeTimeout(60000);
        factory.setUsername("admin");
        factory.setPassword("admin");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 声明 接收消息
        DeliverCallback deliverCallback = (consumerTag,message)->
            // 消息分为很多,如:消息头,消息体,……,我们只需要获取消息体
            System.out.println(new String(message.getBody()));
        ;

        // 取消消息时的回调
        CancelCallback cancelCallback = consumerTag->
            System.out.println("消息消费被中断");
        ;

        /*
        * 消费者消费消息
        * 1、消费哪个队列
        * 2、消费成功之后是否自动应答,true代表的自动应答,false代表手动应答
        * 3、当消息传达后回调
        * 4、消费者取消消费的回调
        * */
        channel.basicConsume(Producer.QUEUE_NAME,true,deliverCallback,cancelCallback);
    


将生产者和消费者都运行,再将生产者重新运行几次
生产者

消费者

每当生产者重新运行,消费者都会接收到生产者发送的消息

3 Work Queues

3.1 概述

工作队列(又称为任务队列)的主要思想是避免立即执行资源密集型任务,而不是不等待它完成。相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。

3.1.1 轮询分发消息

一个生产者发送消息由多个消费者(工作线程)去接收,而多个工作线程是轮询关系而且一个消息只能被处理一次不能处理多次,而每个工作线程接收消息的情况是轮询的。
我们接下来创建两个消费者C1和C2来模拟轮询效果。

3.1.2 创建工具类

将创建连接的代码封装成工具类

package com.zhoujing.rabbltmq.utils;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author zhoujing
 * @version 1.0
 * @createTime 2022/7/1-22:17-星期五
 *
 * RabbitMQ工具类
 */
public class RabbitMqUtils 

    /**
     * 获取信道
     * @return 信道
     * @throws IOException
     * @throws TimeoutException
     */
    public static Channel getChannel() throws IOException, TimeoutException 
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("IP地址");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("admin");
        factory.setHandshakeTimeout(60000);
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        return channel;
    


3.1.3 创建两个工作线程

这次我们先创建工作线程

package com.zhoujing.rabbltmq.two;

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.zhoujing.rabbltmq.utils.RabbitMqUtils;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author zhoujing
 * @version 1.0
 * @createTime 2022/7/1-22:40-星期五
 *
 * 工作线程
 */
public class Worker01 

    private static final String QUEUE_NAME = "hello";

    public static void main(String[] args) throws IOException, TimeoutException 

        DeliverCallback deliverCallback = (consumerTag,message)->
            System.out.println(consumerTag+"接收到的消息:"+new String(message.getBody()));
        ;

        CancelCallback cancelCallback = consumerTag->
            System.out.println(consumerTag+":取消回调");
        ;

        System.out.println("C1正在等待……");
        Channel channel = RabbitMqUtils.getChannel();
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    


第一个工作线程完成,先将其启动起来,接下创建第二个工作线程,使用IDEA添加新的进程,免去了复制代码的操作




将输出语句C1修改成C2

再次点击运行,出现两个打印窗口,一个是C1,一个是C2

3.1.4 创建消费者

创建生产者

package com.zhoujing.rabbltmq.two;

import com.rabbitmq.client.Channel;
import com.zhoujing.rabbltmq.utils.RabbitMqUtils;

import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;

/**
 * @author zhoujing
 * @version 1.0
 * @createTime 2022/7/3-13:37-星期日
 * <p>
 * 生产者,发送大量的消息
 */
public class Task01 

    public static final String QUEUE_NAME = "hello";

    /**
     * 发送大量的信息
     *
     * @param args
     */
    public static void main(String[] args) throws IOException, TimeoutException 
        Channel channel = RabbitMqUtils.getChannel();
        // 因为消费者有多个所以第三个参数必须为false
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        Scanner input = new Scanner(System.in);
        while (input.hasNext()) 
            String message = input.next();
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println("消息发送完成:"+message);
        
    


3.1.5 运行

将生产者运行,连续输入值

按照轮询效果,结果应该是这样的

打开控制台查看,和预期一样
C1:
查看详情

rabbitmq学习笔记(自用)(代码片段)

...概念1.1什么是MQ1.2为什么要用MQ1.3MQ的分类1.4MQ的选择二、RabbitMQ2.1RabbitMQ的概念2.2四大核心概念三、简单案例3.1WorkQueues3.2轮训分发消息3.3消息应答3.4RabbitMQ持久化3.5不公平分发3.6预取值分发四、RabbitMQ-发布确认4.1发布确认逻辑4.2发... 查看详情

rabbitmq消息队列笔记(代码片段)

目录RabbitMQ概念四大核心概念核心部分测试"HelloWorld"测试WorkQueues消息应答自动应答手动应答消息应答方法消息自动重新入队手动应答测试持久化队列持久化消息持久化不公平分发预取值消息确认开启发布确认单个确认发布... 查看详情

学相伴狂神说rabbitmq笔记(简单使用rabbitmq)(代码片段)

目录什么是rabbitMQ使用docker安装RabbitMQ,如果没有使用过docker的可以看这篇文章https://blog.csdn.net/qq_44716544/article/details/119870837授权账号和密码设置用户分配操作权限RabbitMQ支持的消息模型1.入门案例1.RabbitMQ入门案例-Simple简单模式2.什... 查看详情

学习笔记《rabbitmq实战指南》笔记(代码片段)

本文摘录总结自《RabbitMQ实战指南》。一、消息中间件消息队列中间件(MessageQueueMiddleware,简称为MQ)是指利用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成。它... 查看详情

学习笔记《rabbitmq实战指南》笔记(代码片段)

本文摘录总结自《RabbitMQ实战指南》。一、消息中间件消息队列中间件(MessageQueueMiddleware,简称为MQ)是指利用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成。它... 查看详情

rabbitmq学习笔记(代码片段)

文章目录1.RabbitMq(消息中间件)1.概念:是基于队列模式实现的异步/同步的传输数据。2.传统的Http请求存在哪些缺点?3.mq的使用场景?4.mq服务器如何保证消息不丢失?5.VirtualHost?Exchange分派我们的消息存放在哪个队... 查看详情

rabbitmq消息队列笔记(代码片段)

...发布确认高级在生产环境中由于一些不明原因,导致RabbitMQ 重启,在RabbitMQ重启期间生产者消息投递失败,导致消息丢失,需要手动处理和恢复。如何才能进行RabbitMQ的消息可靠投递呢?特别是在比较极端 查看详情

rabbitmq消息队列笔记(代码片段)

...发布确认高级在生产环境中由于一些不明原因,导致RabbitMQ 重启,在RabbitMQ重启期间生产者消息投递失败,导致消息丢失,需要手动处理和恢复。如何才能进行RabbitMQ的消息可靠投递呢?特别是在比较极端 查看详情

rabbitmq学习笔记(代码片段)

文章目录RabbitMq(消息中间件)1.概念:是基于队列模式实现的异步/同步的传输数据。2.传统的Http请求存在哪些缺点?3.mq的使用场景?4.mq服务器如何保证消息不丢失?5.VirtualHost?Exchange分派我们的消息存放在哪个队列... 查看详情

rabbitmq消息队列笔记(代码片段)

...迟队列优化 插件实现延迟队列安装插件测试插件交换机RabbitMQ消息传递模型的核心思想是:生产者生产的消息从不会直接发送到队列。生产者 查看详情

rabbitmq消息队列笔记(代码片段)

...迟队列优化 插件实现延迟队列安装插件测试插件交换机RabbitMQ消息传递模型的核心思想是:生产者生产的消息从不会直接发送到队列。生产者 查看详情

rabbitmq笔记(代码片段)

RabbitMQ1MQ引言1.1什么是MQMQ(MessageQuene):翻译为消息队列,通过典型的生产者和消费者模型,生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接... 查看详情

rabbitmq笔记(代码片段)

RabbitMQ1MQ引言1.1什么是MQMQ(MessageQuene):翻译为消息队列,通过典型的生产者和消费者模型,生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接... 查看详情

rabbitmq笔记springboot整合rabbitmq之simple容器(生产者)(代码片段)

...简介  本文主要用使用SpringBoot(2.5.2)来整合RabbitMQ(2.5.2),使用simple容 查看详情

rabbitmq学习笔记(代码片段)

视频教程【编程不良人】MQ消息中间件之RabbitMQ以及整合SpringBoot2.x实战教程1.MQ引言1.1什么是MQMQ(MessageQuene):消息队列,也叫消息中间件。通过典型的生产者和消费者模型,生产者不断向消息队列中生产消息,消费... 查看详情

rabbitmq学习笔记-p2(springamqp)(代码片段)

...超级推荐!!SpringAMQP1.初识SpringAMQPSpringAMQP是基于RabbitMQ封装的一套模板,并且还利用SpringBoot对其实现了自动装配,使用起来非常方便。SpringAmqp的官方地址:https://spring.io/projects/spring-am 查看详情

rabbitmq学习笔记(代码片段)

rabbitmqAMQP协议安装如何使用AMQP协议生产者消费者队列一个队列存多条消息消息连接发消息和接收消息都是必须先建立连接通道(channel)一个连接有多个通道,消息会在通道里面流转,之后到达消费者交换机(... 查看详情