学相伴狂神说rabbitmq笔记(简单使用rabbitmq)(代码片段)

冷血~多好 冷血~多好     2023-03-21     525

关键词:

目录

什么是rabbitMQ

使用docker安装RabbitMQ,如果没有使用过docker的可以看这篇文章https://blog.csdn.net/qq_44716544/article/details/119870837

授权账号和密码

设置用户分配操作权限

RabbitMQ支持的消息模型

1.入门案例

1. RabbitMQ入门案例 - Simple 简单模式

2. 什么是AMQP

01 什么是AMQP

02 AMQP生产者流转过程

03 AMQP消费者流转过程

3. RabbitMQ的核心组成部分

01 RabbitMQ的核心组成部分

02 RabbitMQ整体架构是什么样子的?

03 RabbitMQ的运行流程

4. RabbitMQ入门案例 - fanout 模式

01 RabbitMQ的模式之发布订阅模式

5. RabbitMQ入门案例 - Direct 模式

6. RabbitMQ入门案例 - Topic 模式

完整案例(创建交换机,创建队列,交换机与队列绑定)

7. RabbitMQ入门案例 - Work模式

01 Work模式轮询模式(Round-Robin)

01轮询模式

02 Work模式公平分发模式 

8. RabbitMQ使用场景

01 解耦、削峰、异步

02 高内聚,低耦合

四、Springboot案例

1. Fanout 模式

2. Direct 模式

3. Topic 模式

五、RabbitMQ高级

2. 死信队列

3. 内存磁盘的监控

01 RabbitMQ内存警告

02 RabbitMQ的内存控制

03 RabbitMQ的内存换页

04 RabbitMQ的磁盘预警

4. 集群(docker集群rabbitmq)

1.先创建三个rabbitmq容器

2.容器节点加入集群


什么是rabbitMQ

 RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue 高级消息队列协议 )的开源实现,能够实现异步消息处理
 RabbitMQ是一个消息代理:它接受和转发消息。
你可以把它想象成一个邮局:当你把你想要发布的邮件放在邮箱中时,你可以确定邮差先生最终将邮件发送给你的收件人。在这个比喻中,RabbitMQ是邮政信箱,邮局和邮递员。 
RabbitMQ和邮局的主要区别在于它不处理纸张,而是接受,存储和转发二进制数据块
        
优点:异步消息处理
           业务解耦(下订单操作:扣减库存、生成订单、发红包、发短信),将下单操作主流程:扣减库存、生成订单然后通过MQ消息队列完成通知,发红包、发短信
            错峰流控 (通知量 消息量 订单量大的情况实现MQ消息队列机制,淡季情况下访问量会少)
        
              灵活的路由(Flexible Routing) 
             在消息进入队列之前,通过 Exchange 来路由消息的。对于典型的路由功能,RabbitMQ 已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,可以将多个 Exchange 绑定在一起,也通过插件机制实现自己的 Exchange 。
        RabbitMQ网站端口号:15672
        程序里面实现的端口为:5672


       

 

使用docker安装RabbitMQ,如果没有使用过docker的可以看这篇文章https://blog.csdn.net/qq_44716544/article/details/119870837

1.拉取RabbitMQ镜像

docker pull rabbitmq:management

2.运行RabbitMQ镜像

docker run -itd --name rabbit01 --hostname myrabbit -e RABBITMQ_DEFAULT_USER=chenjinxian -e RABBITMQ_DEFAULT_PASS=chenjinxian -p 15672:15672 -p  5672:5672 -p 25672:25672 rabbitmq:management

注意:RABBITMQ_DEFAULT_USER=chenjinxian -e RABBITMQ_DEFAULT_PASS=chenjinxian 

   这里设置的是(RABBITMQ_DEFAULT_USER)登录的账号和( RABBITMQ_DEFAULT_PASS)密码,根据自身来修改

 这里看到容器已经开启成功了,然后就可以使用了

 

3.通过浏览器打开

如果你使用的是本地虚拟机,那么你直接使用虚拟机显示的ipv4地址加端口号就可以访问了;

如果你使用的是云服务器,那么你需要在对应服务器(阿里云,腾讯云等)的安全组中开放15672端口,并且在防火墙中也开放15672端口

 

 显示如上图那么就可以开始使用了

然后通过命令进入rabbitmq容器

docker exec -it rabbit01 /bin/bash

授权账号和密码

rabbitmqctl add_user admin admin

设置用户分配操作权限

rabbitmqctl set_user_tags admin administrator

用户级别:

  1. administrator:可以登录控制台、查看所有信息、可以对 rabbitmq进行管理

  2. monitoring:监控者 登录控制台,查看所有信息

  3. policymaker:策略制定者 登录控制台,指定策略

  4. managment 普通管理员 登录控制台

为用户添加资源权限

rabbitmqctl set_permissions -p / admin ".*"".*"".*"

也可以在界面操作进行添加用户

 

RabbitMQ支持的消息模型

1.简单模式 Simple  

 

     2.工作模式 Work

       

 

     3.发布订阅模式

 4.路由模式

5.主题 Topic模式

 

      6.参数模式

7.出版商确认模式

 

1.入门案例

1. RabbitMQ入门案例 - Simple 简单模式

  1. jdk1.8

  2. 构建一个 maven工程

  3. 定义生产者

  4. 定义消费者

  5. 观察消息的在 rabbitmq-server服务中的进程

 

01 构建一个maven工程

02 导入依赖

 <dependencies>
        <!--导入rabbitmq的依赖-->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.13.0</version>
        </dependency>

    </dependencies>

3.代码编写 

 在上图的模型中,有以下概念:

  1. 生产者,也就是要发送消息的程序

  2. 消费者:消息的接受者,会一直等待消息到来。

  3. 消息队列:图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。

生产者

package com.chen.rabbitmq.simple;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

/**
 * @description: 简单模式Simple
 */
public class Producer 


    public static void main(String[] args) 

        // 所有的中间件技术都是基于tcp/ip协议基础之上构建新型的协议规范,只不过rabbitmq遵循的是amqp
        // ip port

        // 1: 创建连接工程
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("128.197.157.151");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("chenjinxian");//rabbitmq登录的账号
        connectionFactory.setPassword("chenjinxian");//rabbitmq登录的密码
        connectionFactory.setVirtualHost("/");

        //springboot ---rabbitmq

        Connection connection = null;
        Channel channel = null;
        try 
            // 2: 创建连接Connection Rabbitmq为什么是基于channel去处理而不是链接? 长连接----信道channel
            connection = connectionFactory.newConnection("生成者");
            // 3: 通过连接获取通道Channel
            channel = connection.createChannel();
            // 4: 通过通创建交换机,声明队列,绑定关系,路由key,发送消息,和接收消息
            String queueName = "queue1";

            /*
             * @params1 队列的名称
             * @params2 是否要持久化durable=false 所谓持久化消息是否存盘,如果false 非持久化 true是持久化? 非持久化会存盘吗? 会存盘,但是会随从重启服务会丢失。
             * @params3 排他性,是否是独占独立
             * @params4 是否自动删除,随着最后一个消费者消息完毕消息以后是否把队列自动删除
             * @params5 携带附属参数
             */
            channel.queueDeclare(queueName, true, false, false, null);
            // 5: 准备消息内容
            String message = "Hello chenjinxian!!!";
            // 6: 发送消息给队列queue
            // @params1: 交换机  @params2 队列、路由key @params 消息的状态控制  @params4 消息主题
            // 面试题:可以存在没有交换机的队列吗?不可能,虽然没有指定交换机但是一定会存在一个默认的交换机。
            channel.basicPublish("", queueName, null, message.getBytes());

            System.out.println("消息发送成功!!!");
         catch (Exception ex) 
            ex.printStackTrace();
         finally 
            // 7: 关闭通道
            if (channel != null && channel.isOpen()) 
                try 
                    channel.close();
                 catch (Exception ex) 
                    ex.printStackTrace();
                
            
            // 8: 关闭连接

            if (connection != null && connection.isOpen()) 
                try 
                    connection.close();
                 catch (Exception ex) 
                    ex.printStackTrace();
                
            
        


    

 

 

消费者

package com.chen.rabbitmq.simple;

import com.rabbitmq.client.*;
import java.io.IOException;

public class Consumer 


    public static void main(String[] args) 

        // 所有的中间件技术都是基于tcp/ip协议基础之上构建新型的协议规范,只不过rabbitmq遵循的是amqp
        // ip port

        // 1: 创建连接工程
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("128.197.157.151");//服务器IP
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("chenjinxian");
        connectionFactory.setPassword("chenjinxian");
        connectionFactory.setVirtualHost("/");

        Connection connection = null;
        Channel channel = null;
        try 
            // 2: 创建连接Connection
            connection = connectionFactory.newConnection("消费者");
            // 3: 通过连接获取通道Channel
            channel = connection.createChannel();
            // 4: 通过通创建交换机,声明队列,绑定关系,路由key,发送消息,和接收消息


            // true = ack 正常的逻辑是没问题 死循环 rabbit 重发策略
            // false = nack 消息这在消费消息的时候可能会异常和故障
            final  Channel channel2 = channel;
            channel2.basicConsume("queue1", false, new DeliverCallback() 
                public void handle(String consumerTag, Delivery message) throws IOException 
                    try 
                        System.out.println("收到消息是" + new String(message.getBody(), "UTF-8"));
                        channel2.basicAck(message.getEnvelope().getDeliveryTag(),false);
                    catch (Exception ex)
                        ex.printStackTrace();
                        // 三次确认 -- reject + sixin
                    

                
            , new CancelCallback() 
                public void handle(String consumerTag) throws IOException 
                    System.out.println("接受失败了...");
                
            );

            System.out.println("开始接受消息");
            System.in.read();

         catch (Exception ex) 
            ex.printStackTrace();
         finally 
            // 7: 关闭通道
            if (channel != null && channel.isOpen()) 
                try 
                    channel.close();
                 catch (Exception ex) 
                    ex.printStackTrace();
                
            
            // 8: 关闭连接

            if (connection != null && connection.isOpen()) 
                try 
                    connection.close();
                 catch (Exception ex) 
                    ex.printStackTrace();
                
            
        


    

 

2. 什么是AMQP

01 什么是AMQP

AMQP全称:Advanced Message Queuing Protocol(高级消息队列协议)。是应用层协议的一个开发标准,为面向消息的中间件设计

02 AMQP生产者流转过程

 

03 AMQP消费者流转过程

 

3. RabbitMQ的核心组成部分

01 RabbitMQ的核心组成部分

 

核心概念: 核心概念:
Server :又称Broker ,接受客户端的连接,实现AMQP实体服务。安装rabbitmq-serverConnection:连接,应用程序与Broker的网络连接TCP/IP/三次握手和四次挥手 服务器:又称Broker,接受客户端的连接,实现AMQP实体服务。安装Rabbitmq-serverConnection:连接,应用程序与Broker的网络连接tcp/ip/三次握手和四次挥手
Channel:网络信道,几乎所有的操作都在Channel中进行,Channel是进 息读写的通道,客户端可以建立对恪Channel,每个Channel代表一个会话任务。 频道:网络信道,几乎所有的操作都在频道中进行频道,是进息读写的通道,客户端可以建立对恪频道频道,每个频道代表一个会话任务频道。
Message :消息:服务与应用程序之间传送的数据,由Properties和body组成,Properties可是对消息进行修饰,如消息的优先级,延迟等高级特性,Body则就是消息体的内容。 消息:消息:服务与应用程序之间传送的数据,由Properties和Body组成,Properties可是对消息进行修饰,如消息的优先级,延迟等高级特性,Body则就是消息体的内容。
Virtual Host虚拟地址,用于进行逻辑隔离,最上层的消息路由,一个虚拟主机理由可以有若干个Exhange和Queueu,同一个虚拟主机里面不能有相同名字的Exchange 虚拟主机虚拟地址,用于进行逻辑隔离,最上层的消息路由,一个虚拟主机理由可以有若干个Exhange和Queueu,同一个虚拟主机里面不能有相同名字的Exchange
Exchange :交换机,接受消息,根据路由键发送消息到绑定的队列。(==不具备消息存储的能力==)Bindings : Exchange和Queue之间的虚拟连接,binding中可以保护多个routing key. 交换:交换机,接受消息,根据路由键发送消息到绑定的队列.(=不具备消息存储的能力==)绑定:Exchange和Queue之间的虚拟连接,Binding中可以保护多个路由密钥。
Routing key :是一个路由规则,虚拟机可以用它来确定如何路由一个特疋消恳.bttos:/bloq.csdn.net/qg _4485823(Queue:队列:也成为Message Queue,消息队列,保存消息并将它们转发给消费苦。"gwa" 路由密钥:是一个路由规则,虚拟机可以用它来确定如何路由一个特征消息(队列:队列:也成为消息队列,消息队列,保存消息并将它们转发给消费者.

 

02 RabbitMQ整体架构是什么样子的?

03 RabbitMQ的运行流程

 

 

4. RabbitMQ入门案例 - fanout 模式

01 RabbitMQ的模式之发布订阅模式

 

发布订阅模式的具体实现

  1. 类型:fanout

  2. 特点:Fanout - 发布与订阅模式,是一种广播机制,它是没有路由 key的模式

(注意这里已经在可视化界面让队列绑定了交换机)

 

生产者

package com.chen.rabbitmq.fanout;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 发布订阅模式的具体实现
 类型:fanout
 特点:Fanout - 发布与订阅模式,是一种广播机制,它是没有路由 key的模式
 */
public class Producer 
    public static void main(String[] args) 
        // 1: 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2: 设置连接属性
        connectionFactory.setHost("128.156.157.161");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("chenjinxian");
        connectionFactory.setPassword("chenjinxian");
        Connection connection = null;
        Channel channel = null;
        try 
            // 3: 从连接工厂中获取连接
            connection = connectionFactory.newConnection("生产者");
            // 4: 从连接中获取通道channel
            channel = connection.createChannel();

            // 5: 准备发送消息的内容
            String message = "hello xuexi!!!";

            // 6:准备交换机
            String exchangeName = "fanout_change";

            // 8: 指定交换机的类型
            String type = "fanout";
            // 7: 发送消息给中间件rabbitmq-server
            // @params1: 交换机exchange
            // @params2: 队列名称/routingkey
            // @params3: 属性配置
            // @params4: 发送消息的内容
            //  #.course.* queue3
            // *.order.# queue2 ta
            // com.order.course.xxx collecion
            channel.basicPublish(exchangeName,"", null, message.getBytes());


            System.out.println("消息发送成功!");
         catch (Exception ex) 
            ex.printStackTrace();
            System.out.println("发送消息出现异常...");
         finally 

            // 7: 释放连接关闭通道
            if (channel != null && channel.isOpen()) 
                try 
                    channel.close();
                 catch (Exception ex) 
                    ex.printStackTrace();
                
            
            if (connection != null) 
                try 
                    connection.close();
                 catch (Exception ex) 
                    ex.printStackTrace();
                
            
        
    

消费者

package com.chen.rabbitmq.fanout;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
 发布订阅模式的具体实现
 类型:fanout
 特点:Fanout - 发布与订阅模式,是一种广播机制,它是没有路由 key的模式
 */
public class Consumer 

    private static Runnable runnable = new Runnable() 
        public void run() 
            // 1: 创建连接工厂
            ConnectionFactory connectionFactory = new ConnectionFactory();
            // 2: 设置连接属性
            connectionFactory.setHost("128.156.157.151");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            connectionFactory.setUsername("chenjinxian");
            connectionFactory.setPassword("chenjinxian");
            //获取队列的名称
            final String queueName = Thread.currentThread().getName();
            Connection connection = null;
            Channel channel = null;
            try 
                // 3: 从连接工厂中获取连接
                connection = connectionFactory.newConnection("生产者");
                // 4: 从连接中获取通道channel
                channel = connection.createChannel();
                // 5: 申明队列queue存储消息
                /*
                 *  如果队列不存在,则会创建
                 *  Rabbitmq不允许创建两个相同的队列名称,否则会报错。
                 *
                 *  @params1: queue 队列的名称
                 *  @params2: durable 队列是否持久化
                 *  @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
                 *  @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
                 *  @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
                 * */
                // 这里如果queue已经被创建过一次了,可以不需要定义
                //channel.queueDeclare("queue1", false, false, false, null);
                // 6: 定义接受消息的回调
                Channel finalChannel = channel;
                finalChannel.basicConsume(queueName, true, new DeliverCallback() 
                    @Override
                    public void handle(String s, Delivery delivery) throws IOException 
                        System.out.println(queueName + ":收到消息是:" + new String(delivery.getBody(), "UTF-8"));
                    
                , new CancelCallback() 
                    @Override
                    public void handle(String s) throws IOException 
                    
                );
                System.out.println(queueName + ":开始接受消息");
                System.in.read();
             catch (Exception ex) 
                ex.printStackTrace();
                System.out.println("发送消息出现异常...");
             finally 
                // 7: 释放连接关闭通道
                if (channel != null && channel.isOpen()) 
                    try 
                        channel.close();
                     catch (Exception ex) 
                        ex.printStackTrace();
                    
                
                if (connection != null && connection.isOpen()) 
                    try 
                        connection.close();
                     catch (Exception ex) 
                        ex.printStackTrace();
                    
                
            
        
    ;



    public static void main(String[] args) 
        // 启动三个线程去执行
        new Thread(runnable, "queue1").start();
        new Thread(runnable, "queue2").start();
        new Thread(runnable, "queue3").start();
        new Thread(runnable, "queue4").start();
        //new Thread(runnable, "queue5").start();
    

 

 

5. RabbitMQ入门案例 - Direct 模式

 (注意这里已经在可视化界面让队列绑定了交换机)

生产者

package com.chen.rabbitmq.routing;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 Direct 模式
 */
public class Producer 
    public static void main(String[] args) 
        // 1: 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2: 设置连接属性
        connectionFactory.setHost("128.176.157.151");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("chenjinxian");
        connectionFactory.setPassword("chenjinxian");
        Connection connection = null;
        Channel channel = null;
        try 
            // 3: 从连接工厂中获取连接
            connection = connectionFactory.newConnection("生产者");
            // 4: 从连接中获取通道channel
            channel = connection.createChannel();

            // 5: 准备发送消息的内容
            String message = "hello direct_exchange!!!";

            // 6:准备交换机
            String exchangeName = "direct_exchange";
            // 7: 定义路由key
            String routeKey = "email";
            // 8: 指定交换机的类型
            String type = "direct";
            // 7: 发送消息给中间件rabbitmq-server
            // @params1: 交换机exchange
            // @params2: 队列名称/routingkey
            // @params3: 属性配置
            // @params4: 发送消息的内容
            //  #.course.* queue3
            // *.order.# queue2 ta
            // com.order.course.xxx collecion
            channel.basicPublish(exchangeName, routeKey, null, message.getBytes());
            System.out.println("消息发送成功!");
         catch (Exception ex) 
            ex.printStackTrace();
            System.out.println("发送消息出现异常...");
         finally 
            // 7: 释放连接关闭通道
            if (channel != null && channel.isOpen()) 
                try 
                    channel.close();
                 catch (Exception ex) 
                    ex.printStackTrace();
                
            
            if (connection != null) 
                try 
                    connection.close();
                 catch (Exception ex) 
                    ex.printStackTrace();
                
            
        
    

消费者

package com.chen.rabbitmq.routing;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
 Direct 模式
 */
public class Consumer 

    private static Runnable runnable = new Runnable() 
        public void run() 
            // 1: 创建连接工厂
            ConnectionFactory connectionFactory = new ConnectionFactory();
            // 2: 设置连接属性
            connectionFactory.setHost("123.156.147.151");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            connectionFactory.setUsername("chenjinxian");
            connectionFactory.setPassword("chenjinxian");
            //获取队列的名称
            final String queueName = Thread.currentThread().getName();
            Connection connection = null;
            Channel channel = null;
            try 
                // 3: 从连接工厂中获取连接
                connection = connectionFactory.newConnection("生产者");
                // 4: 从连接中获取通道channel
                channel = connection.createChannel();
                // 5: 申明队列queue存储消息
                /*
                 *  如果队列不存在,则会创建
                 *  Rabbitmq不允许创建两个相同的队列名称,否则会报错。
                 *
                 *  @params1: queue 队列的名称
                 *  @params2: durable 队列是否持久化
                 *  @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
                 *  @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
                 *  @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
                 * */
                // 这里如果queue已经被创建过一次了,可以不需要定义
                //channel.queueDeclare("queue1", false, false, false, null);
                // 6: 定义接受消息的回调
                Channel finalChannel = channel;
                finalChannel.basicConsume(queueName, true, new DeliverCallback() 
                    @Override
                    public void handle(String s, Delivery delivery) throws IOException 
                        System.out.println(queueName + ":收到消息是:" + new String(delivery.getBody(), "UTF-8"));
                    
                , new CancelCallback() 
                    @Override
                    public void handle(String s) throws IOException 
                    
                );
                System.out.println(queueName + ":开始接受消息");
                System.in.read();
             catch (Exception ex) 
                ex.printStackTrace();
                System.out.println("发送消息出现异常...");
             finally 
                // 7: 释放连接关闭通道
                if (channel != null && channel.isOpen()) 
                    try 
                        channel.close();
                     catch (Exception ex) 
                        ex.printStackTrace();
                    
                
                if (connection != null && connection.isOpen()) 
                    try 
                        connection.close();
                     catch (Exception ex) 
                        ex.printStackTrace();
                    
                
            
        
    ;



    public static void main(String[] args) 
        // 启动三个线程去执行
        new Thread(runnable, "queue1").start();
        new Thread(runnable, "queue2").start();
        new Thread(runnable, "queue3").start();
        new Thread(runnable, "queue4").start();
      //  new Thread(runnable, "queue5").start();
    

6. RabbitMQ入门案例 - Topic 模式

  (注意这里已经在可视化界面让队列绑定了交换机)

 生产者

package com.chen.rabbitmq.topics;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 Topic模式
 */
public class Producer 
    public static void main(String[] args) 
        // 1: 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2: 设置连接属性
        connectionFactory.setHost("125.156.157.151");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("chenjinxian");
        connectionFactory.setPassword("chenjinxian");
        Connection connection = null;
        Channel channel = null;
        try 
            // 3: 从连接工厂中获取连接
            connection = connectionFactory.newConnection("生产者");
            // 4: 从连接中获取通道channel
            channel = connection.createChannel();

            // 5: 准备发送消息的内容
            String message = "hello topic_exchange!!!";

            // 6:准备交换机
            String exchangeName = "topic_exchange";
            // 7: 定义路由key
            String routeKey = "com.order.user";
            // 8: 指定交换机的类型
            String type = "topic";
            // 7: 发送消息给中间件rabbitmq-server
            // @params1: 交换机exchange
            // @params2: 队列名称/routingkey
            // @params3: 属性配置
            // @params4: 发送消息的内容
            //  #.course.* queue3
            // *.order.# queue2 ta
            // com.order.course.xxx collecion
            channel.basicPublish(exchangeName, routeKey, null, message.getBytes());
            System.out.println("消息发送成功!");
         catch (Exception ex) 
            ex.printStackTrace();
            System.out.println("发送消息出现异常...");
         finally 
            // 7: 释放连接关闭通道
            if (channel != null && channel.isOpen()) 
                try 
                    channel.close();
                 catch (Exception ex) 
                    ex.printStackTrace();
                
            
            if (connection != null) 
                try 
                    connection.close();
                 catch (Exception ex) 
                    ex.printStackTrace();
                
            
        
    

 消费者不变

完整案例(创建交换机,创建队列,交换机与队列绑定)

package com.chen.rabbitmq.all;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
完整案例
 */
public class Producer 
    public static void main(String[] args) 
        // 1: 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2: 设置连接属性
        connectionFactory.setHost("151.156.157.151");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("chenjinxian");
        connectionFactory.setPassword("chenjinxian");
        Connection connection = null;
        Channel channel = null;
        try 
            // 3: 从连接工厂中获取连接
            connection = connectionFactory.newConnection("生产者");
            // 4: 从连接中获取通道channel
            channel = connection.createChannel();
            // 6: 准备发送消息的内容
            String message = " 你好,小白";
            // 交换机
            String  exchangeName = "direct_message_exchange";
            // 交换机的类型 direct/topic/fanout/headers
            String exchangeType = "direct";

            // 如果你用界面把queueu 和 exchange的关系先绑定话,你代码就不需要在编写这些声明代码可以让代码变得更加简洁,但是不容读懂
            // 如果用代码的方式去声明,我们要学习一下
            // 7: 声明交换机 所谓的持久化就是指,交换机会不会随着服务器重启造成丢失,如果是true代表不丢失,false重启就会丢失
            channel.exchangeDeclare(exchangeName,exchangeType,true);

            // 8: 声明队列
            channel.queueDeclare("queue5",true,false,false,null);
            channel.queueDeclare("queue6",true,false,false,null);
            channel.queueDeclare("queue7",true,false,false,null);

            // 9:绑定队列和交换机的关系
            channel.queueBind("queue5",exchangeName,"order");
            channel.queueBind("queue6",exchangeName,"order");
            channel.queueBind("queue7",exchangeName,"course");

            channel.basicPublish(exchangeName, "course", null, message.getBytes());
            System.out.println("消息发送成功!");
         catch (Exception ex) 
            ex.printStackTrace();
            System.out.println("发送消息出现异常...");
         finally 
            // 7: 释放连接关闭通道
            if (channel != null && channel.isOpen()) 
                try 
                    channel.close();
                 catch (Exception ex) 
                    ex.printStackTrace();
                
            
            if (connection != null) 
                try 
                    connection.close();
                 catch (Exception ex) 
                    ex.printStackTrace();
                
            
        
    

 执行完后生成队列和交换机

 

 

7. RabbitMQ入门案例 - Work模式

01 Work模式轮询模式(Round-Robin)

 

当有多个消费者时,我们的消息会被哪个消费者消费呢,我们又该如何均衡消费者消费信息的多少呢?

主要有两种模式:

  1. 轮询模式的分发:一个消费者一条,按均分配

  2. 公平分发:根据消费者的消费能力进行公平分发,处理快的处理的多,处理慢的处理的少;按劳分配

01轮询模式

生产者

package com.chen.rabbitmq.work.lunxun;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
 轮询模式
 */
public class Producer 
    public static void main(String[] args) 
        // 1: 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2: 设置连接属性
        connectionFactory.setHost("123.156.147.151");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("chenjinxian");
        connectionFactory.setPassword("chenjinxian");
        Connection connection = null;
        Channel channel = null;
        try 
            // 3: 从连接工厂中获取连接
            connection = connectionFactory.newConnection("生产者");
            // 4: 从连接中获取通道channel
            channel = connection.createChannel();
            // 6: 准备发送消息的内容
            //===============================end topic模式==================================
            for (int i = 1; i <= 20; i++) 
                //消息的内容
                String msg = "学相伴:" + i;
                // 7: 发送消息给中间件rabbitmq-server
                // @params1: 交换机exchange
                // @params2: 队列名称/routingkey
                // @params3: 属性配置
                // @params4: 发送消息的内容
                channel.basicPublish("", "queue1", null, msg.getBytes());
            
            System.out.println("消息发送成功!");
         catch (Exception ex) 
            ex.printStackTrace();
            System.out.println("发送消息出现异常...");
         finally 
            // 7: 释放连接关闭通道
            if (channel != null && channel.isOpen()) 
                try 
                    channel.close();
                 catch (Exception ex) 
                    ex.printStackTrace();
                
            
            if (connection != null) 
                try 
                    connection.close();
                 catch (Exception ex) 
                    ex.printStackTrace();
                
            
        
    

消费者

package com.chen.rabbitmq.work.lunxun;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
 轮询模式
 */
public class Work1 
    public static void main(String[] args) 
        // 1: 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2: 设置连接属性
        connectionFactory.setHost("123.156.147.155");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("chenjinxian");
        connectionFactory.setPassword("chenjinxian");
        Connection connection = null;
        Channel channel = null;
        try 
            // 3: 从连接工厂中获取连接
            connection = connectionFactory.newConnection("消费者-Work1");
            // 4: 从连接中获取通道channel
            channel = connection.createChannel();
            // 5: 申明队列queue存储消息
            /*
             *  如果队列不存在,则会创建
             *  Rabbitmq不允许创建两个相同的队列名称,否则会报错。
             *
             *  @params1: queue 队列的名称
             *  @params2: durable 队列是否持久化
             *  @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
             *  @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
             *  @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
             * */
            // 这里如果queue已经被创建过一次了,可以不需要定义
//            channel.queueDeclare("queue1", false, false, false, null);
            // 同一时刻,服务器只会推送一条消息给消费者
            // 6: 定义接受消息的回调
            Channel finalChannel = channel;
            //finalChannel.basicQos(1);

            finalChannel.basicConsume("queue1", true, new DeliverCallback() 
                @Override
                public void handle(String s, Delivery delivery) throws IOException 
                    try
                        System.out.println("Work1-收到消息是:" + new String(delivery.getBody(), "UTF-8"));
                        Thread.sleep(200);
                    catch(Exception ex)
                        ex.printStackTrace();
                    
                
            , new CancelCallback() 
                @Override
                public void handle(String s) throws IOException 
                
            );
            System.out.println("Work1-开始接受消息");
            System.in.read();
         catch (Exception ex) 
            ex.printStackTrace();
            System.out.println("发送消息出现异常...");
         finally 
            // 7: 释放连接关闭通道
            if (channel != null && channel.isOpen()) 
                try 
                    channel.close();
                 catch (Exception ex) 
                    ex.printStackTrace();
                
            
            if (connection != null && connection.isOpen()) 
                try 
                    connection.close();
                 catch (Exception ex) 
                    ex.printStackTrace();
                
            
        
    
package com.chen.rabbitmq.work.lunxun;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
 轮询模式
 */
public class Work2 
    public static void main(String[] args) 
        // 1: 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2: 设置连接属性
        connectionFactory.setHost("123.195.157.151");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("chenjinxian");
        connectionFactory.setPassword("chenjinxian");
        Connection connection = null;
        Channel channel = null;
        try 
            // 3: 从连接工厂中获取连接
            connection = connectionFactory.newConnection("消费者-Work2");
            // 4: 从连接中获取通道channel
            channel = connection.createChannel();
            // 5: 申明队列queue存储消息
            /*
             *  如果队列不存在,则会创建
             *  Rabbitmq不允许创建两个相同的队列名称,否则会报错。
             *
             *  @params1: queue 队列的名称
             *  @params2: durable 队列是否持久化
             *  @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通

学相伴狂神说rabbitmq笔记(简单使用rabbitmq)(代码片段)

目录什么是rabbitMQ使用docker安装RabbitMQ,如果没有使用过docker的可以看这篇文章https://blog.csdn.net/qq_44716544/article/details/119870837授权账号和密码设置用户分配操作权限RabbitMQ支持的消息模型1.入门案例1.RabbitMQ入门案例-Simple简单模式2.什... 查看详情

狂神说java个人笔记-javaweb

该笔记很多内容来自B站狂神说Java的视频,视频地址在下面 查看详情

狂神说javamybatis笔记

https://blog.csdn.net/ddddeng_/article/details/106927021 查看详情

springmvc狂神说-springmvc笔记

链接。 查看详情

springboot狂神说-springboot笔记

链接。 查看详情

b站狂神说java笔记-面向对象编程(代码片段)

目录狂神视频地址1.面向过程&面向对象2.构造方法3.封装4.继承5.重写6.多态7.static详解8.抽象类9.接口10.内部类狂神视频地址https://www.bilibili.com/video/BV12J41137hu1.面向过程&面向对象面向过程思想:  步骤清晰简单、第一步... 查看详情

遇见狂神说---javaweb部分笔记(代码片段)

传送门==>B站遇见狂神说—JavaWeb入门到实战最近开始复习web部分;笔记和练习只是跟着视频整理的;但是有的知识点并没有整理进来(比如一些基础概念,安装步骤)ml1.web基本;Tomcat安装使用2.http详细讲解3.maven3.1IDEA中使用Maven3.1... 查看详情

b站狂神说java---记录springmvc学习笔记(代码片段)

B站学习传送门==>【狂神说Java】SpringMVC最新教程IDEA版通俗易懂感谢狂神❤ml1.回顾servlet2.SpringMVC概述3.SpringMVC执行原理4.深入理解5.使用注解进行springmvc的配置6.controller配置小结首先看看实现Controller接口的方式使用注解的... 查看详情

狂胜说mybatis笔记(代码片段)

目录狂神说MyBatis01:第一个程序狂神说MyBatis02:CRUD操作及配置解析狂神说MyBatis03:ResultMap及分页狂神说MyBatis04:使用注解开发狂神说MyBatis05:一对多和多对一处理多对一的处理一对多的处理狂神说MyBatis06:... 查看详情

rabbitmq超详细学习笔记(章节清晰+通俗易懂)(代码片段)

...学习,最近花了一段时间系统学习了当下最为主流的RabbitMQ消息队列,学习过程中也随时记录,刚开始学习的时候懵懵懂懂,做的笔记都比较杂乱,系统学习完后我将笔记内容不断反复修改,对章节进行设... 查看详情

b站狂神说java笔记-java入门学习

目录狂神视频地址Java特性Java三大版本JDK、JRE、JVM是什么?Java程序的运行机制狂神视频地址https://www.bilibili.com/video/BV12J41137huJava特性简单性面向对象可移植性(跨平台)高性能分布式动态性(反射)多线程安全... 查看详情

狂神说spring笔记全链接

推荐新手看的学习spring框架的视频,来自BiliBili狂神说视频地址BiliBili视频地址笔记链接共9篇最重要的就是理解控制反转(IoC)和面向切面编程(AOP)的思想文章链接01.概述及IOC理论推导https://mp.weixin.qq.com/s/VM6... 查看详情

狂神说spring笔记全链接

推荐新手看的学习spring框架的视频,来自BiliBili狂神说视频地址BiliBili视频地址笔记链接共9篇最重要的就是理解控制反转(IoC)和面向切面编程(AOP)的思想文章链接01.概述及IOC理论推导https://mp.weixin.qq.com/s/VM6... 查看详情

狂神说spring笔记全链接

推荐新手看的学习spring框架的视频,来自BiliBili狂神说视频地址BiliBili视频地址笔记链接共9篇最重要的就是理解控制反转(IoC)和面向切面编程(AOP)的思想文章链接01.概述及IOC理论推导https://mp.weixin.qq.com/s/VM6... 查看详情

节清晰+通俗易懂)(代码片段)

...学习,最近花了一段时间系统学习了当下最为主流的RabbitMQ消息队列,学习过程中也随时记录,刚开始学习的时候懵懵懂懂,做的笔记都比较杂乱,系统学习完后我将笔记内容不断反复修改,对章节进行设... 查看详情

b站狂神说java笔记-java流程控制(代码片段)

目录狂神视频地址1.Scanner类next():nextLine():2.顺序结构3.选择结构4.switch多选择语句5.循环结构狂神视频地址https://www.bilibili.com/video/BV12J41137hu1.Scanner类  通过scanner类的next()与nextLine()方法获取输入的字符串,在读取我们一... 查看详情

狂神说java笔记--多线程详解部分笔记(代码片段)

传送门==>B站遇见狂神说Java多线程详解做笔记时有的知识点并没有整理;ml1.线程创建之继承Thread类图片下载练习2.线程创建之实现Runnable接口买票案例模拟龟兔赛跑3.线程创建之实现Callable接口4.静态代理模式5.Lambda表达式6.... 查看详情

通俗易懂的讲讲什么是中间件?(代码片段)

学习视频:【学相伴】RabbitMQ最新完整教程IDEA版通俗易懂|KuangStudy|狂神说|学相伴飞哥_哔哩哔哩_bilibili目录一、中间件简介1.1、什么是中间件1.2、为什么要使用中间件1.3、中间件特点1.4、什么时候使用中间件技术1.5、常用中... 查看详情