二万字长文图文详解rabbitmq6种工作模式(理论与代码相结合)(代码片段)

java小丑 java小丑     2023-01-09     619

关键词:


前言

RabbitMQ 简介:RabbitMQ 基于 AMQP 标准,采用 Erlang 语言开发的消息中间件。


提示:以下是本篇文章正文内容

一、RabbitMQ 基础架构

  • Producer:作为消息的生成者。
  • Consumer:作为消息的消费者。
  • Connection:消息的发布方或者消息的消费方 和broker 之间的 TCP 连接。
  • Channel:Channel 是在 connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的 channel 进行通讯,AMQP method包含了channel id 帮助客户端和message broker 识别 channel,所以 channel之间是完全隔离的,减少了操作系统建立 TCP connection 的开销。
  • Broker:接收和分发消息的应用,RabbitMQ服务就是Message Broker。
  • Virtual host:虚拟机,出于多租户和安全因素设计的,把 AMQP的基本组件划分到一个虚拟的分组中,可以类比mysql数据库会创建很多库,库和库之间是独立的。当多个不同的用户使用同一个RabbitMQserver 提供的服务时,可以划分出多个vhost,每个用户在自己的 vhost 创建 exchange/queue 等。
  • Queue:队列,消息队列,接收消息、缓存消息,消息最终被送到这里等待 consumer 取走。
  • Binding:绑定,exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据
  • Exchange:交换机,message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到queue 中去。

交换机常用的类型有:
Fanout:广播,将消息交给所有绑定到交换机的队列
Direct:定向,把消息交给符合指定routing key 的队列
Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列

二、工作模式

一、6 种工作模式理论

RabbitMQ 提供了 6 种工作模式,简单模式、work queues、Publish/Subscribe
发布与订阅模式、Routing 路由模式、Topics 主题模式、RPC 远程调用模式(远程调用,不太算消息队列)


简单模式:一个生产者生产消息发送到队列里面,一个消费者从队列里面拿消息,进行消费消息。一个生产者、一个消费者,不需要设置交换机(使用默认的交换机)

说明:类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。


Work queues 工作队列模式:一个生产者生产消息发送到队列里面,一个或者多个消费者从队列里面拿消息,进行消费消息。一个生产者、多个消费者(竞争关系),不需要设置交换机(使用默认的交换机)

说明:对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。应用场景:过年过节12306抢票,发短信给用户,可以接入多个短信服务进行发送,提供任务的处理速度。


Pub/Sub 订阅模式 :一个生产者生产消息发送到交换机里面,由交换机处理消息,队列与交换机的任意绑定,将消息指派给某个队列,一个或者多个消费者从队列里面拿消息,进行消费消息。需要设置类型为 fanout 的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机会将消息发送到绑定的队列。

说明:Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange 绑定,或者没有符合路由规则的队列,那么消息会丢失!


Routing 路由模式:一个生产者生产消息发送到交换机里面,并且指定一个路由key,队列与交换机的绑定是通过路由key进行绑定的,消费者在消费的时候需要根据路由key从交换机里面拿消息,进行消费消息。需要设置类型为 direct 的交换机,交换机和队列进行绑定,并且指定 routing key,当发送消息到交换机后,交换机会根据 routing key 将消息发送到对应的队列。

说明:Routing 模式要求队列在绑定交换机时要指定 routing key,消息会转发到符合 routing key 的队列。


Topics 通配符模式:一个生产者生产消息发送到交换机里面,并且使用通配符的形式(类似mysql里面的模糊查询,比如想获取一批带有item前缀的数据),队列与交换机的绑定是通过通配符进行绑定的,消费者在消费的时候需要根据根据通配符从交换机里面拿消息,进行消费消息。需要设置类型为 topic 的交换机,交换机和队列进行绑定,并且指定通配符方式的 routing key,当发送消息到交换机后,交换机会根据 routing key 将消息发送到对应的队列
说明:通配符规则:# 匹配一个或多个词,* 匹配不多不少恰好1个词。例如:Lazy.# 能够匹配 Lazy.insert.content或者 Lazy.insert,Lazy.* 只能匹配Lazy.insert。


二、6 种工作模式的代码

一、6 种工作模式的Demo演示


创建一个Maven工程,引入pom依赖:

    <dependencies>
		<!--rabbitmq客户端-->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.3.0</version>
        </dependency>
		<!--json转换工具包-->
        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.8.5</version>
        </dependency>
    </dependencies>

创建一个连接Rabbitmq的工具类:

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class RabbitUtils 
    private static ConnectionFactory connectionFactory = new ConnectionFactory();
    static 
        connectionFactory.setHost("你的rabbitmq的ip地址");
        connectionFactory.setPort(5672);//RabbitMQ的默认端口号,根据实际情况修改
        connectionFactory.setUsername("你的rabbitmq的用户名称");
        connectionFactory.setPassword("你的rabbitmq的用户密码");
        connectionFactory.setVirtualHost("你的rabbitmq的虚拟机");
    
    public static Connection getConnection()
        Connection conn = null;
        try 
            conn = connectionFactory.newConnection();
            return conn;
         catch (Exception e) 
            throw new RuntimeException(e);
        
    


简单模式

为了区分好理解,我每个模式都去创建一个虚拟机,这里我先去rabbitMq管控页面创建一个虚拟机


修改工具类的虚拟机:

生产者:

import com.liao.rabbitmq.utils.RabbitConstant;
import com.liao.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Producer 

    public static void main(String[] args) throws Exception 
        //获取TCP长连接
        Connection conn = RabbitUtils.getConnection();
        //创建通信“通道”,相当于TCP中的虚拟连接
        Channel channel = conn.createChannel();
        //创建队列,声明并创建一个队列,如果队列已存在,则使用这个队列
        //channel.queueDeclare的五个参数
        //第一个参数:队列名称ID
        //第二个参数:是否持久化,false对应不持久化数据,MQ停掉数据就会丢失
        //第三个参数:是否队列私有化,false则代表所有消费者都可以访问,true代表只有第一次拥有它的消费者才能一直使用,其他消费者不让访问
        //第四个:是否自动删除,false代表连接停掉后不自动删除掉这个队列
        //其他额外的参数, null
        channel.queueDeclare(RabbitConstant.QUEUE_TEST,false, false, false, null);
        String message = "要发送的message";
        //channel.basicPublish的四个参数
        //exchange 交换机,暂时用不到,在后面进行发布订阅时才会用到
        //队列名称
        //额外的设置属性
        //最后一个参数是要传递的消息字节数组
        channel.basicPublish("", RabbitConstant.QUEUE_TEST, null,message.getBytes());
        channel.close();
        conn.close();
        System.out.println("===发送成功===");
    


消费者:

import com.liao.rabbitmq.utils.RabbitConstant;
import com.liao.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.*;
import java.io.IOException;

public class Consumer 

    public static void main(String[] args) throws Exception
        //获取TCP长连接
        Connection conn = RabbitUtils.getConnection();
        //创建通信“通道”,相当于TCP中的虚拟连接
        Channel channel = conn.createChannel();
        //创建队列,声明并创建一个队列,如果队列已存在,则使用这个队列
        //第一个参数:队列名称ID
        //第二个参数:是否持久化,false对应不持久化数据,MQ停掉数据就会丢失
        //第三个参数:是否队列私有化,false则代表所有消费者都可以访问,true代表只有第一次拥有它的消费者才能一直使用,其他消费者不让访问
        //第四个:是否自动删除,false代表连接停掉后不自动删除掉这个队列
        //其他额外的参数, null
        channel.queueDeclare(RabbitConstant.QUEUE_TEST,false, false, false, null);
        //从MQ服务器中获取数据
        //创建一个消息消费者
        //第一个参数:队列名
        //第二个参数代表是否自动确认收到消息,false代表手动编程来确认消息,这是MQ的推荐做法
        //第三个参数要传入DefaultConsumer的实现类
        channel.basicConsume(RabbitConstant.QUEUE_TEST, false, new Reciver(channel));
    



class  Reciver extends DefaultConsumer 

    private Channel channel;
    
    //重写构造函数,Channel通道对象需要从外层传入,在handleDelivery中要用到
    public Reciver(Channel channel) 
        super(channel);
        this.channel = channel;
    

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException 
         String message = new String(body);
         System.out.println("消费者接收到的消息:"+message);
         System.out.println("消息的TagId:"+envelope.getDeliveryTag());
        //false只确认签收当前的消息,设置为true的时候则代表签收该消费者所有未签收的消息
        channel.basicAck(envelope.getDeliveryTag(), false);
    

我先启动消费者后启动生产者,这样只要生产者一生产消息,消费者就可以立马消费。


Work queues 工作队列模式

为了区分好理解,我每个模式都去创建一个虚拟机,这里我先去rabbitMq管控页面创建一个虚拟机

修改工具类的虚拟机

为了模拟某些业务,这里使用自定义实体类发送消息,所以我新建了一个自定义实体类

/**
 * 自定义的实体类:发送内容
 */
public class SenderContent 

    private String name;
    private String content;

    public SenderContent(String name, String content) 
        this.name = name;
        this.content = content;
    

    public String getName() 
        return name;
    

    public void setName(String name) 
        this.name = name;
    

    public String getContent() 
        return content;
    

    public void setContent(String content) 
        this.content = content;
    

生产者:

import com.liao.rabbitmq.utils.RabbitConstant;
import com.liao.rabbitmq.utils.RabbitUtils;
import com.google.gson.Gson;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

/**
 * 生成者
 */
public class Producer 

    public static void main(String[] args) throws Exception 
        Connection connection = RabbitUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(RabbitConstant.QUEUE_SENDER_CONTENT, false, false, false, null);
        for(int i = 1 ; i <= 100 ; i++) 
            SenderContent senderContent = new SenderContent("姓名:" + i, "内容:" + i);
            String jsonSMS = new Gson().toJson(senderContent);
            channel.basicPublish("" , RabbitConstant.QUEUE_SENDER_CONTENT , null , jsonSMS.getBytes());
        
        System.out.println("发送数据成功");
        channel.close();
        connection.close();
    


消费者一:

import com.liao.rabbitmq.utils.RabbitConstant;
import com.liao.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.*;
import java.io.IOException;


/**
 * 消费者1
 */
public class ConsumerOne 

    public static void main(String[] args) throws Exception 
        Connection connection = RabbitUtils.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(RabbitConstant.QUEUE_SENDER_CONTENT, false, false, false, null);
        //如果不写basicQos(1),则自动MQ会将所有请求平均发送给所有消费者
        //basicQos,MQ不再对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),在从队列中获取一个新的
        channel.basicQos(1);//处理完一个取一个
        channel.basicConsume(RabbitConstant.QUEUE_SENDER_CONTENT , false , new DefaultConsumer(channel)
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException 
                String jsonSMS = new String(body);
                System.out.println("ConsumerOne-发送成功:" + jsonSMS);
                try 
                    Thread.sleep(10);
                 catch (Exception e) 
                    e.printStackTrace();
                
                //确认签收
                channel.basicAck(envelope.getDeliveryTag() , false);
            
        );
    


消费者二:

import com.liao.rabbitmq.utils.RabbitConstant;
import com.liao.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.*;
import java.io.IOException;

/**
 * 消费者2
 */
public class ConsumerTwo 

    public static void main(String[] args) throws IOException 
        Connection connection = RabbitUtils.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(RabbitConstant.QUEUE_SENDER_CONTENT, false, false, false, null);
        //如果不写basicQos(1),则自动MQ会将所有请求平均发送给所有消费者
        //basicQos,MQ不再对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),在从队列中获取一个新的
        channel.basicQos(1);//处理完一个取一个
        channel.basicConsume(RabbitConstant.QUEUE_SENDER_CONTENT , false , new DefaultConsumer(channel)
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException 
                String jsonSMS = new String(body);
                System.out.println("ConsumerTwo-发送成功:" + jsonSMS);
                try 
                    Thread.sleep(100);
                 catch (InterruptedException e) 
                    e.printStackTrace();
                
                //确认签收
                channel.basicAck(envelope.getDeliveryTag() , false);
            
        );
    


消费者三:

import com.liao.rabbitmq.utils.RabbitConstant;
import com.liao.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.*;
import java.io.IOException;

/**
 * 消费者3
 */
public class ConsumerThree 


    public static void main(String[] args) throws IOException 
        Connection connection = RabbitUtils.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare查看详情  

万字长文图文详解spring整合rabbitmq(简单模式,广播模式,路由模式,通配符模式,消息可靠性投递,ttl)(代码片段)

文章目录前言一、项目代码1.生产者1.项目架构图:2.pom.xml依赖:3.spring-rabbitmq-producer.xml:4.rabbitmq.properties:5.ProducerTest:2.消费者1.项目架构图2.pom.xml依赖:3.spring-rabbitmq-consumer.xm 查看详情

近九万字图文详解rabbitmq(代码片段)

我是廖志伟,一名Java开发工程师、幕后大佬社区创始人、Java领域优质创作者、CSDN博客专家。拥有多年一线研发经验,研究过各种常见框架及中间件的底层源码,对于大型分布式、微服务、三高架构(高性能、高... 查看详情

【万字长文】详解每日站会的各种模式

参考技术A本文首发于微信号“小船哥说敏捷”。每日站会已经成为许多团队的常见仪式,特别是在敏捷软件开发中。然而,有许多微妙的细节可以帮助我们区分是在有效的开会还是在浪费时间。每日站会(也称为“每日Scrum”、... 查看详情

二万字《算法和数据结构》三张动图,三十张彩图,c语言基础教学,之二叉搜索树详解(建议收藏)

本文已收录于专栏🌳《画解数据结构》🌳前言  我们知道,「顺序表」可以「快速索引」数据,而「链表」则可以快速的进行数据的「插入和删除」。那么,有没有一种数据结构,可以快速的实现「增... 查看详情

二万字《算法和数据结构》三张动图,三十张彩图,c语言基础教学,之二叉搜索树详解(建议收藏)

本文已收录于专栏🌳《画解数据结构》🌳前言  我们知道,「顺序表」可以「快速索引」数据,而「链表」则可以快速的进行数据的「插入和删除」。那么,有没有一种数据结构,可以快速的实现「增... 查看详情

vector的底层实现!(万字长文详解!)(代码片段)

vector的底层简单实现!vector的成员变量template<classT>classvectortypedefT*iterator;//迭代器 typedefconstT*const_iterator;private: iterator_start; iterator_finish; iterator_endofstorage;;reservevoidreserve(size_ 查看详情

golang✔️实战✔️10种加密方法实现☢️万字长文建议手收藏☢️(代码片段)

【Golang】✔️实战✔️10种加密方法实现☢️万字长文建议手收藏☢️概述md5SHA-2sha256sha512计算文件哈希base64AESDESDES3DESRSA函数详解GenerateKey()MarshalPKCS1PrivateKey()pem编码生成公钥私钥实现加密概述今天来带大家实现以下最常用的10中... 查看详情

golang✔️实战✔️10种加密方法实现☢️万字长文建议手收藏☢️(代码片段)

【Golang】✔️实战✔️10种加密方法实现☢️万字长文建议手收藏☢️概述md5SHA-2sha256sha512计算文件哈希base64AESDESDES3DESRSA函数详解GenerateKey()MarshalPKCS1PrivateKey()pem编码生成公钥私钥实现加密概述今天来带大家实现以下最常用的10中... 查看详情

万字长文详解hivesql执行计划(代码片段)

HiveSQL的执行计划描述SQL实际执行的整体轮廓,通过执行计划能了解SQL程序在转换成相应计算引擎的执行逻辑,掌握了执行逻辑也就能更好地把握程序出现的瓶颈点,从而能够实现更有针对性的优化。此外还能帮助开发者识别看... 查看详情

单例模式(万字长文精讲)(代码片段)

1、单例模式的定义单例模式(SingletonPattern),确保一个类只有一个实例,并提供对它的全局访问点。这是在java-design-patterns.com中对于单例模式的定义,其原文定义如下:Ensureaclasshasonlyoneinstance,andprovideaglob... 查看详情

二万字爆肝javase基础知识(代码片段)

文章目录🙀作者简介🙀前言一、如何选择学习资料二、我推荐的三、准备工作1.编辑器1.1EditPlus1.2Eclipse1.3IntelliJIDEA1.4MyEclipse2.计算机基础操作2.1显示文件后缀2.2常用DOS命令3.计算机语言发展史3.1机器语言3.2汇编语言3.3高级... 查看详情

万字长文详解hivesql执行计划(代码片段)

HiveSQL的执行计划描述SQL实际执行的整体轮廓,通过执行计划能了解SQL程序在转换成相应计算引擎的执行逻辑,掌握了执行逻辑也就能更好地把握程序出现的瓶颈点,从而能够实现更有针对性的优化。可以说执行计划... 查看详情

万字长文详解yolov1-v5系列模型(代码片段)

一,YOLOv1Abstract1.Introduction2.UnifiedDetectron2.1.NetworkDesign2.2Training2.4.Inferences4.1ComparisontoOtherReal-TimeSystems5,代码实现思考二,YOLOv2摘要YOLOv2的改进1,中心坐标位置预测的改进2,1个gird只能对应一个目标的改进3,backbone的改进4, 查看详情

rabbitmq6种使用模式

RabbitMQ的5种模式与实例1.1简单模式HelloWorld功能:一个生产者P发送消息到队列Q,一个消费者C接收生产者实现思路:创建连接工厂ConnectionFactory,设置服务地址127.0.0.1,端口号5672,设置用户名、密码、virtualhost,从连接工厂中获取... 查看详情

2万字长文深入详解kafka,从源码到架构全部讲透(代码片段)

????????关注后回复 “进群” ,拉你进程序员交流群????????作者:erainmhttps://blog.csdn.net/eraining/article/details/115860664-   消息队列的核心价值  - 解耦合。异步处理例如电商平台,秒杀活动。一般流程会分为:1: 风... 查看详情

c++知识分享:socket编程详解,万字长文(代码片段)

 介绍Socket编程让你沮丧吗?从manpages中很难得到有用的信息吗?你想跟上时代去编Internet相关的程序,但是为你在调用connect()前的bind()的结构而不知所措?等等…好在我已经将这些事完成了,我将和所有人共... 查看详情

2.5万字详解:23种设计模式(代码片段)

Object()是一样的性质,而工厂方法应该用于复杂对象的初始化,当需要调用有参的构造函数时便无能为力了,这样像为了工厂而工厂,没有实际意义。2不同的产品需要不同额外参数的时候不支持。6.再升级(重要)(1)工厂类:... 查看详情