rabbitmq学习笔记(持续更新ing)(代码片段)

抠脚的大灰狼 抠脚的大灰狼     2023-02-28     357

关键词:

快速入门(java)

  1. 首先安装rabbitmq(单机版)

    rabbitmq的安装(官网文档)

    在我自己租的云服务器上,直接用docker进行安装(一行命令搞定)

    docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.9-management
    

    然后在阿里云的控制台,放开567215672端口

    随后,可以直接登录rabbitmq的管理后台http://127.0.0.1:15672,便能看到rabbitmq的情况

    rabbit会创建一个默认的用户,用户名guest,密码guest

  2. 基于java编写一个简单的生产者和消费者

    rabbitmq的java教程(官网文档)

    创建一个简单的maven项目,引入rabbitmq的java依赖包

    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.9.0</version>
    </dependency>
    

    把rabbitmq的相关信息放在一个常量类中

    package com.yogurt.demo.rabbit;
    
    /**
     * @Author yogurtzzz
     * @Date 2021/12/14 9:42
     **/
    public class Constants 
    
    	private Constants()  
    
    	public static final String RABBIT_IP = "127.0.0.1";
    
    	public static final int RABBIT_PORT = 5672;
    
    	public static final String RABBIT_USER = "guest";
    
    	public static final String RABBIT_PASSWORD = "guest";
    
    	public static final String RABBIT_QUEUE_NAME = "hello";
    
    
    

    编写一个生产者,负责推送消息到rabbit

    package com.yogurt.demo.rabbit;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    import java.io.BufferedReader;
    import java.io.InputStreamReader;
    import java.nio.charset.StandardCharsets;
    
    import static com.yogurt.demo.rabbit.Constants.*;
    
    public class Send 
    
    	public static void main(String[] argv) throws Exception 
            // 连接工厂
    		ConnectionFactory factory = new ConnectionFactory();
    		// 设置连接信息, ip, 端口号, 账号, 密码
            factory.setHost(RABBIT_IP);
    		factory.setPort(RABBIT_PORT);
    		factory.setUsername(RABBIT_USER);
    		factory.setPassword(RABBIT_PASSWORD);
    		// 创建连接, 发送消息 (使用try-with-resource)
    		try (Connection connection = factory.newConnection()) 
    				String message = "Hello Rabbit";
    				Channel channel = connection.createChannel();
                    //如果该名称的队列不存在, 则新建一个
    				channel.queueDeclare(RABBIT_QUEUE_NAME, false, false, false, null);
    				// 向该队列发送一条消息	
                channel.basicPublish("", RABBIT_QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
    				System.out.println(" [x] Sent '" + message + "'");
    		
    	
    
    

    跑起来!

    然后我们登录管理页面看看

    可以看到名为hello的队列中,有1条消息,我们可以点击队列的名称,然后点击Get Messages,获取队列中的消息,可以看到这条消息的内容是Hello Rabbit

    说明消息成功发送到rabbitmq当中了

    随后,我们编写一个消费者

    package com.yogurt.demo.rabbit;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.DeliverCallback;
    
    import java.nio.charset.StandardCharsets;
    import static com.yogurt.demo.rabbit.Constants.*;
    
    /**
     * @Author yogurtzzz
     * @Date 2021/12/14 9:42
     **/
    public class Recv 
    
    	public static void main(String[] args) 
    		ConnectionFactory factory = new ConnectionFactory();
    		factory.setHost(RABBIT_IP);
    		factory.setPort(RABBIT_PORT);
    		factory.setUsername(RABBIT_USER);
    		factory.setPassword(RABBIT_PASSWORD);
    
    		// 获取连接
    		Connection connection = null;
    		try 
    			connection = factory.newConnection();
    			Channel channel = connection.createChannel();
    			channel.queueDeclare(RABBIT_QUEUE_NAME, false, false, false, null);
    			System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    
    			DeliverCallback deliverCallback = (consumerTag, delivery) -> 
    				String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
    				System.out.println(" [x] Received '" + message + "'");
    			;
    			channel.basicConsume(RABBIT_QUEUE_NAME, true, deliverCallback, consumerTag ->  );
    		 catch (Exception e) 
    			e.printStackTrace();
    		
    	
    
    

    跑起来!

    消费者成功消费到了

上面的示例就是一个最基本的模型,只有一个生产者,一个队列,一个消费者。

下面演示一个生产者,多个消费者的情况

这是一种竞争消费的模式,在一个队列上,绑定了多个消费者,消费者会争抢着消费消息。

生产者

package com.yogurt.demo.rabbit;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;

import static com.yogurt.demo.rabbit.Constants.*;

public class Send 


	public static void main(String[] argv) throws Exception 
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost(RABBIT_IP);
		factory.setPort(RABBIT_PORT);
		factory.setUsername(RABBIT_USER);
		factory.setPassword(RABBIT_PASSWORD);
		// 获取连接, 发送消息
		try (Connection connection = factory.newConnection()) 
			// 从控制台读入
            BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
			while (true) 
				String message = reader.readLine();
                // 输入 -1 则表示退出
				if ("-1".equals(message)) return;
				Channel channel = connection.createChannel();
				channel.queueDeclare(RABBIT_QUEUE_NAME, false, false, false, null);
				channel.basicPublish("", RABBIT_QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
				System.out.println(" [x] Sent '" + message + "'");
			
		
	

消费者

package com.yogurt.demo.rabbit;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import static com.yogurt.demo.rabbit.Constants.*;

/**
 * @Author yogurtzzz
 * @Date 2021/12/14 9:42
 **/
public class Recv implements Runnable

	@Override
	public void run() 
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost(RABBIT_IP);
		factory.setPort(RABBIT_PORT);
		factory.setUsername(RABBIT_USER);
		factory.setPassword(RABBIT_PASSWORD);

		long threadId = Thread.currentThread().getId();
		// 获取连接
		Connection connection = null;
		try 
			connection = factory.newConnection();
			Channel channel = connection.createChannel();
			channel.queueDeclare(RABBIT_QUEUE_NAME, false, false, false, null);
			System.out.println("Thread " + threadId + " [*] Waiting for messages. To exit press CTRL+C");

			DeliverCallback deliverCallback = (consumerTag, delivery) -> 
				String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
				System.out.println("Thread " + threadId + " [x] Received '" + message + "'");
			;
			channel.basicConsume(RABBIT_QUEUE_NAME, true, deliverCallback, consumerTag ->  );
		 catch (Exception e) 
			e.printStackTrace();
		
	

	public static void main(String[] args) throws IOException 
		Runnable runnable = new Recv();
		// 启动5个消费者
		for (int i = 0; i < 5; i++) 
			new Thread(runnable).start();
		
		// stuck here
		System.in.read();
	


先启动5个消费者

可以在管理后台看到现在有5个连接

再启动生产者,并在控制台输入一些信息

可以看到发送到rabbitmq的三条消息,成功被消费者消费(5个消费者争抢着消费,一条消息只会被一个消费者消费,此种模式下,rabbitmq会依次将消息推送给消费者,根据下图可以观察到,消费者的启动顺序为15,16,13,14,12。rabbitmq也按照这个顺序(轮询,Round-Robin)依次把消息交给对应的消费者进行消费)

快速入门(springboot)

上面介绍的是基于java的简单教程,但是通常我们开发一个应用,会使用到框架,其中又以springboot为代表。下面介绍rabbitmq整合springboot的基本使用

  1. 创建一个springboot项目

  2. pom.xml中添加如下依赖

    <dependency>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    
  3. application.yml中配置rabbitmq的地址等

    spring:
      application:
        name: rabbitmq-demo
      rabbitmq:
        host: 127.0.0.1
        port: 5672
        username: yogurt
        password: yogurt
        virtual-host: /test
    
  4. 添加配置类,配置队列,consumer工厂,消息转换器等

    package com.demo.rabbitmq.config;
    
    import org.springframework.amqp.core.AcknowledgeMode;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.amqp.support.converter.MessageConverter;
    import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class RabbitMqConfig 
    
        /**
        * 注册一个 MessageConverter, 发送消息时可以直接发送一个POJO
        **/
    	@Bean
    	public MessageConverter messageConverter() 
    		return new Jackson2JsonMessageConverter();
    	
    
    	/**
    	 * 新建一个队列, 队列名为 yogurt
    	 * **/
    	@Bean
    	public Queue yogurt() 
    		return new Queue("yogurt");
    	
    
        /**
        * 配置consumer工厂
        * **/
    	@Bean
    	public SimpleRabbitListenerContainerFactory consumerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer,
    	                                                            ConnectionFactory connectionFactory) 
    		SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    		// consumer 的 prefetch 设置
            factory.setPrefetchCount(30);
    		// 并发配置 - 同时开启5个消费者(5个线程)
    		factory.setConcurrentConsumers(5);
            // 最大并发配置 (当消息堆积时, 会新开线程来处理, 最大能到20个)
            // 有点类似jdk的线程池
    		factory.setMaxConcurrentConsumers(20);
            // 消费者开启 手动ack 机制
    		factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
            // 接收消息时, 可以直接将消息反序列化为 POJO
    		factory.setMessageConverter(new Jackson2JsonMessageConverter());
    		configurer.configure(factory, connectionFactory);
    		return factory;
    	
    
    
  5. 定义一个POJO,表示发送到rabbitmq的消息

    public class UserInfo implements Serializable 
    
    	private String name;
    
    	private Integer age;
    
    	private String career;
    
    	private String gender;
    
    	private String hometown;
        
        // 省略了构造函数和 getter/setter
    
    
  6. 创建生产者

    package com.demo.rabbitmq.component;
    
    import com.demo.rabbitmq.data.UserInfo;
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.amqp.core.Queue;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Profile;
    import org.springframework.scheduling.annotation.Scheduled;
    import org.springframework.stereotype.Component;
    
    /**
     * @Author yogurtzzz
     * @Date 2021/12/15 14:55
     **/
    @Profile("sender")
    @Component
    public class RabbitMqSender 
    
    	private int cnt = 0;
    
        // 由 rabbitmq-starter 自动注册进来的, 其实现目前只有1个  RabbitTemplate 
        // 但为了依赖于接口, 最好用 AmqpTemplate 来接收
    	@Autowired
    	private AmqpTemplate template;
    
        // 这里的 Queue 就是前面配置的名称为 yogurt 的队列
    	@Autowired
    	private Queue queue;
    
    
    	/**
    	 * 每4秒发送一条消息
    	 * */
    	@Scheduled(fixedRate = 5000, initialDelay = 2000)
    	public void send() 
    		cnt++;
    		UserInfo info = new UserInfo("yogurt-" + cnt, 26, "Software Engineer", "Male", "China");
    		// 发送一个 UserInfo 对象到 rabbitmq
            template.convertAndSend(queue.getName(), info);
    		Systemandroid第一行代码--学习笔记(更新中ing)(代码片段)
    

    ...觉得对自己目前有用的内容,目前作者也是刚刚入门学习阶段,如果有错误希望大家多多指正。PS:为什么我会写这几句没用的东西(因为昨天写的笔记突然好多人看。。。我慌了)只是分享看书而已!最后 查看详情

    android入门学习笔记(更新中ing)(代码片段)

    Android入门知识梳理Activity篇关于activity的理解理解一:activity是提供界面(UI)的一种组件。理解二:Activity用于显示用户界面,用户通过Activity交互完成相关操作。activity的生命周期-onCreate():在Activity第一次被创建时... 查看详情

    linux开发工具使用(持续学习更新ing……)

    一、Nginx的使用1.启动nginx1$nginx-c/自定义位置/nginx.conf2.停止nginx$nginx-sstop3.重启nginx$nginx-sreload二、vi/vim的使用$?<string>-搜索指定字符串 查看详情

    javaweb学习笔记(持续更新)(代码片段)

    javaweb学习笔记一、JavaWeb简介二、认识Servlet1、什么是Servlet?2、请求路径3、tomcat4、Servlet的使用三、Servlet简单应用1、创建Servlet的方式:2、Serlvet的生命周期3、路径与servlet绑定的方式4、servlet种的request和response4、服务器... 查看详情

    javaweb学习笔记(持续更新)(代码片段)

    javaweb学习笔记一、JavaWeb简介二、认识Servlet1、什么是Servlet?2、请求路径3、tomcat4、Servlet的使用三、Servlet简单应用1、创建Servlet的方式:2、Serlvet的生命周期3、路径与servlet绑定的方式4、servlet种的request和response4、服务器... 查看详情

    物联网使能服务--笔记(持续更新ing)

    一、产品功能1.终端接入T-Link协议简介:端云交互协议,针对2/3/4G移动蜂窝网络及wifi网络的物联网场景特点:不同的序号组用于不同的目的,应答报文的业务层报文需要和请求报文一致长连接模式,终端需通过心跳报文... 查看详情

    linux学习笔记持续更新(代码片段)

    Linux笔记安装部分:分区注意:swap虚拟内存为内存的两倍例如4G(如果大的话就不用设置)boot分区主引导500M/根全部分配还可以创一个/data,可以用来挂载东西用配置部分:hostnamectlset-hostnamename#设置主机名... 查看详情

    持续更新uni-app学习笔记(代码片段)

    优先级1)官方手册优先uni-app官网uni-app:一个使用Vue.js开发跨平台应用的前端框架https://uniapp.dcloud.io/README2)面向效果找组件https://hellouniapp.dcloud.net.cn/pages/component/view/viewhttps://hellouniapp.dcloud.net 查看详情

    持续更新uni-app学习笔记(代码片段)

    2022.5.19updated 优先级1)官方手册优先uni-app官网uni-app:一个使用Vue.js开发跨平台应用的前端框架https://uniapp.dcloud.io/README2)面向效果找组件内置组件扩展组件https://hellouniapp.dcloud.net.cn/pages/component/view/view3 查看详情

    firefox所支持的全部标签(持续更新ing)

    近期研究上各个浏览器的差别,得到一些资料,FireFox眼下所支持的全部标签类型,持续更新,供大家參考和学习,不喜勿喷哦http://mxr.mozilla.org/seamonkey/source/parser/htmlparser/src/nsElementTable.cpp 查看详情

    nowcodertop12-16——持续更新ing(代码片段)

    TOP12.单链表的排序publicclassSolution/***@paramheadListNode类theheadnode*@returnListNode类*/publicListNodesortInList(ListNodehead)//法二:转化为数组排序ArrayList<Integer>nums=newArrayList();Lis 查看详情

    大饼博士的神经网络/机器学习算法收录合集:2020年整理,持续更新ing

    文章目录ReversibleResidualNetwork异步SGD的延迟补偿算法,DC-ASGD,2016年CoordConv,Uber,2018[1][2]NullHop,稀疏CNN计算加速器,ETHZurich,2018Learningrate,batchsizeandminima[3]AnEmpiricalModelof 查看详情

    nowcodertop23-27——持续更新ing(代码片段)

    TOP23.二叉树的前序遍历publicclassSolution/***代码中的类名、方法名、参数名已经指定,请勿修改,直接返回方法规定的值即可*@paramrootTreeNode类*@returnint整型一维数组*/publicvoidpreorder(List<Integer>list,TreeNoderoot)//空节点... 查看详情

    javascript高级学习笔记目录(持续更新)

    【JavaScript高级】this绑定、绑定优先级、相关面试题与箭头函数【JavaScript高级】浏览器原理:渲染引擎解析页面步骤、回流和重绘、composite合成、defer与async【JavaScript高级】JavaScript的运行原理:V8引擎,JS代码执行原... 查看详情

    javascript高级学习笔记目录(持续更新)

    【JavaScript高级】this绑定、绑定优先级、相关面试题与箭头函数【JavaScript高级】浏览器原理:渲染引擎解析页面步骤、回流和重绘、composite合成、defer与async【JavaScript高级】JavaScript的运行原理:V8引擎,JS代码执行原... 查看详情

    nowcodertop28-34二叉树——持续更新ing(代码片段)

    TOP28.二叉树的最大深度publicclassSolution/***@paramrootTreeNode类*@returnint整型*/publicintmaxDepth(TreeNoderoot)//层序遍历计算深度intcount=0;if(root==null)return0;//队列Queue<TreeNode>queue= 查看详情

    博弈论(gametheory)入门学习笔记(持续更新)

    博弈论(GameTheory)入门学习笔记(持续更新)课程介绍1-1Taste-Backoff1-2Self-InterestedAgentsandUtilityTheory1-3Define1-4Examples1-5NashEquilibriumIntro1-6StrategicReasoning1-7BestResponseandNas 查看详情

    spring学习笔记(持续更新)

    1.spring替我们创建的bean,它放在哪里了?通常而言,我们一般会用到两种bean:singleton和prototype。对于singleton的bean,一旦创建过后,spring会把它存到一个map里面。DefaultSingletonBeanRegistry类/**Cacheofsingletonobjects:beanname-->beaninstance*/pr... 查看详情