关键词:
目录
一、简介
消息中间件具有一系列功能如低耦合、可靠投递、广播、流量控制、最终一致性等,成为异步RPC的主要手段之一,常见的ActiveMQ、RabbitMQ、Kafka、RocketMQ等。消息中间件主要作用如下:
- 异步处理
- 应用解耦
- 流量削峰
- 日志处理
RabbitMQ默认容器是simple容器,从2.0版本之后就多了一个容器direct容器,我们从分布式架构的角度一起来看看到底有什么不同吧。本文主要用使用Spring Boot(2.5.2)来整合RabbitMQ(2.5.2),使用simple容器实现一个消费者。本文的前提是有一个安装好的RabbitMQ的环境。
1.1 本文中注解说明
- @Configuration:用于定义配置类,被注解的类可以包含有一个或多个被@Bean注解的方法,这些方法将会被AnnotationConfigApplicationContext或AnnotationConfigWebApplicationContext类进行扫描,并用于构建bean定义,初始化Spring容器
- @RabbitListener:@RabbitListener注解指定目标方法来作为消费消息的方法,通过注解参数指定所监听的队列或者Binding。 @RabbitListener标注在类上面表示当有收到消息的时候,就交给 @RabbitHandler 的方法处理
- @RabbitHandler: 需配合 @RabbitListener注解一起使用,当收到消息后,根据 MessageConverter 转换后的参数类型调用相关的方法
二、Maven依赖
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.2</version>
<relativePath/>
</parent>
<groupId>com.alian</groupId>
<artifactId>rabbitmq-simple</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>rabbitmq-simple</name>
<description>SpringBoot整合RabbitMQ之simple容器(消费者)</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<version>$parent.version</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>$parent.version</version>
</dependency>
<!--rabbitMq的版本 版本最好和springboot保持一致-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>$parent.version</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.68</version>
</dependency>
<!--用于序列化-->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.9.10</version>
</dependency>
<!--java 8时间序列化-->
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
<version>2.9.10</version>
</dependency>
<!--自己打包上传到私服的,用于测试-->
<dependency>
<groupId>com.alian</groupId>
<artifactId>common</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.16</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
这里需要注意的是下面这个包,是我本人打包到私服的,其实一个员工类,支付类,加上一个常量类,大家也自己打包自己的实体到私服,或者直接通过模块开发的方式实现我这个实例。
<dependency>
<groupId>com.alian</groupId>
<artifactId>common</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
三、配置类
3.1 基础配置
本文开始说了,会从分布式架构的角度来完成本次整合。我们会用两个系统消费者和生产者来完成本次的测试,采用这种麻烦的方式,不是不会单元测试或者模块开发,而是为了避免大家踩坑,这样也比较贴近我们实际开发。所以我提前准备了几个简单的类(MQConstants.java、Employee.java、PayRecord.java)打成一个jar包到maven私服,一个配置类存放MQ常量,一个员工类,一个支付类,里面的属性都是常用类型,具体如下。
MQConstants .java
package com.alian.common.constant;
public class MQConstants
/**
* 交换机
*/
public final static String ALIAN_EXCHANGE_NAME = "ALIAN_EXCHANGE";
/**
* 队列名
*/
public final static String ALIAN_QUEUE_NAME = "ALIAN_QUEUE";
/**
* 路由key
*/
public final static String ALIAN_ROUTINGKEY_NAME = "ALIAN_ROUTINGKEY";
Employee.java
package com.alian.common.dto;
import java.io.Serializable;
import java.time.LocalDate;
import java.util.Objects;
public class Employee implements Serializable
private static final long serialVersionUID = 1L;
/**
* 员工编号
*/
private String id = "";
/**
* 员工姓名
*/
private String name = "";
/**
* 员工年龄
*/
private int age;
/**
* 工资
*/
private double salary = 0.00;
/**
* 部门
*/
private String department = "";
/**
* 入职时间
*/
private LocalDate hireDate = LocalDate.of(1970, 1, 1);
/**
* 注意:被序列化对象应提供一个无参的构造函数,否则会抛出异常
*/
public Employee()
public String getId()
return id;
public void setId(String id)
this.id = id;
public String getName()
return name;
public void setName(String name)
this.name = name;
public int getAge()
return age;
public void setAge(int age)
this.age = age;
public double getSalary()
return salary;
public void setSalary(double salary)
this.salary = salary;
public String getDepartment()
return department;
public void setDepartment(String department)
this.department = department;
public LocalDate getHireDate()
return hireDate;
public void setHireDate(LocalDate hireDate)
this.hireDate = hireDate;
@Override
public String toString()
return "Employee" +
"id='" + id + '\\'' +
", name='" + name + '\\'' +
", age=" + age +
", salary=" + salary +
", department='" + department + '\\'' +
", hireDate=" + hireDate +
'';
@Override
public boolean equals(Object o)
if (this == o) return true;
if (!(o instanceof Employee)) return false;
Employee employee = (Employee) o;
return getAge() == employee.getAge() &&
Double.compare(employee.getSalary(), getSalary()) == 0 &&
Objects.equals(getId(), employee.getId()) &&
Objects.equals(getName(), employee.getName()) &&
Objects.equals(getDepartment(), employee.getDepartment()) &&
Objects.equals(getHireDate(), employee.getHireDate());
@Override
public int hashCode()
return Objects.hash(getId(), getName(), getAge(), getSalary(), getDepartment(), getHireDate());
PayRecord.java
package com.alian.common.dto;
import java.io.Serializable;
import java.time.LocalDateTime;
import java.util.Objects;
public class PayRecord implements Serializable
private static final long serialVersionUID = 1L;
/**
* 支付流水
*/
private String payTranSeq = "";
/**
* 支付金额(单位分)
*/
private int payAmount;
/**
* 支付方式(00:现金,01:微信,02:支付宝,03:银联,04:其他)
*/
private String payType = "01";
/**
* 支付状态(00:支付成功,01:待支付,02:支付失败,03:已取消)
*/
private String status = "01";
/**
* 支付时间
*/
private LocalDateTime payTime = LocalDateTime.now();
/**
* 第三方流水
*/
private String payNo = "";
/**
* 注意:被序列化对象应提供一个无参的构造函数,否则会抛出异常
*/
public PayRecord()
public String getPayTranSeq()
return payTranSeq;
public void setPayTranSeq(String payTranSeq)
this.payTranSeq = payTranSeq;
public int getPayAmount()
return payAmount;
public void setPayAmount(int payAmount)
this.payAmount = payAmount;
public String getPayType()
return payType;
public void setPayType(String payType)
this.payType = payType;
public String getStatus()
return status;
public void setStatus(String status)
this.status = status;
public LocalDateTime getPayTime()
return payTime;
public void setPayTime(LocalDateTime payTime)
this.payTime = payTime;
public String getPayNo()
return payNo;
public void setPayNo(String payNo)
this.payNo = payNo;
@Override
public String toString()
return "PayRecord" +
"payTranSeq='" + payTranSeq + '\\'' +
", payAmount=" + payAmount +
", payType='" + payType + '\\'' +
", status='" + status + '\\'' +
", payTime=" + payTime +
", payNo='" + payNo + '\\'' +
'';
@Override
public boolean equals(Object o)
if (this == o) return true;
if (!(o instanceof PayRecord)) return false;
PayRecord payRecord = (PayRecord) o;
return getPayAmount() == payRecord.getPayAmount() &&
Objects.equals(getPayTranSeq(), payRecord.getPayTranSeq()) &&
Objects.equals(getPayType(), payRecord.getPayType()) &&
Objects.equals(getStatus(), payRecord.getStatus()) &&
Objects.equals(getPayTime(), payRecord.getPayTime()) &&
Objects.equals(getPayNo(), payRecord.getPayNo());
@Override
public int hashCode()
return Objects.hash(getPayTranSeq(), getPayAmount(), getPayType(), getStatus(), getPayTime(), getPayNo());
3.2 交换机、路由、队列配置
具体的解释,我相信代码里说得很清楚了。com.alian.common.constant.MQConstants是我公共包里的,具体的代码在上面的MQConstants.java
ExchangeConfig.java
package com.alian.rabbitmq.config;
import com.alian.common.constant.MQConstants;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org查看详情
springboot整合rabbitmq报错
org.springframework.amqp.AmqpAuthenticationException:com.rabbitmq.client.AuthenticationFailureException:ACCESS_REFUSED-LoginwasrefusedusingauthenticationmechanismPLAIN.Fordetailsseethebrokerlogfile. 查看详情
springboot2.x-springboot整合amqp之rabbitmq
文章目录SpringBoot2.X-SpringBoot整合AMQP之RabbitMQRabbitMQ简介引入依赖编写配置编写接口启用Rabbit注解消息监听消息测试SpringBoot2.X-SpringBoot整合AMQP之RabbitMQSpringBoot2整合RabbitMQ案例。RabbitMQ简介简介RabbitMQ是一个由erlang开发的AMQP(AdvanvedMess... 查看详情
springboot整合rabbitmq
1整合RabbitMQ1.1RabbitMQ的相关概念组成部分队列(Queue)声明队列```java@BeanpublicQueueaddUserQueue(){returnnewQueue("demo-user-add");}```交换机(Exchange)用于转发消息,但是它不会做存储,如果没有Queuebind到Exchange的话,它会直接丢弃掉Producer发... 查看详情
springboot整合rabbitmq
记录学习过程从我做起。搞个备份有备无患加入依赖配置yml文件注入RabbitTemplate测试生产者功能消费端实现消费端结果 查看详情
springboot整合rabbitmq转载https://www.cnblogs.com/hlhdidi/p/6535677.html(代码片段)
springboot学习笔记-6springboot整合RabbitMQ一RabbitMQ的介绍 RabbitMQ是消息中间件的一种,消息中间件即分布式系统中完成消息的发送和接收的基础软件.这些软件有很多,包括ActiveMQ(apache公司的),RocketMQ(阿里巴巴公司的,现已经转让... 查看详情
springboot整合rabbitmq(代码片段)
SpringBoot整合RabbitMQ搭建环境创建测试项目:test_rabbitmq_boot添加依赖<?xmlversion="1.0"encoding="UTF-8"?><projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi& 查看详情
spring和springboot整合rabbitmq(代码片段)
Spring和SpringBoot整合RabbitMQ一、Spring整合RabbitMQ1.Producer1.1Config1.1Producer2.Consumer拉取推送消息2.1Config2.2Consumer3.Consumer消息监听(用于推消息)3.1Config3.2MessageListener3.3Consumer二、SpringBoot整合Ra 查看详情
springboot整合rabbitmq(新手整合请勿喷)
整合前先在springboot引入rabbitMqJAR包,版本号可以为自己自定义,本项目是跟随springboot的版本<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></ 查看详情
企业级springboot教程(十五)springboot整合rabbitmq
...务器,并且通过它怎么去发送和接收消息。我将构建一个springboot工程,通过RabbitTemplate去通过MessageListenerAdapter去订阅一个POJO类型的消息。准备工作15minIDEAmaven3.0在开始构建项目之前,机器需要安装rabbitmq,你可以去官网下载,htt... 查看详情
springboot整合rabbitmq入门~~
SpringBoot整合RabbitMQ 入门2020-01-12 创建生产者类,并且在yml配置文件中配置5要素连接MQyml配置文件 spring: & 查看详情
springboot整合rabbitmq
当前社区活跃度最好的消息中间件就是kafka和rabbitmq了,前面对kafaka的基础使用做了一些总结,最近开始研究rabbitmq,查看了很多资料,自己仿着写了一些demo,在博客园记录一下。rabbitmq基础知识 关于rabbitmq基础知识,可... 查看详情
[rabbitmq]整合springboot(代码片段)
整合SpringBoot创建项目引入依赖<dependencies><!--RabbitMQ依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>< 查看详情
springboot整合rabbitmq
SpringBoot整合RabbitMQ公司最近在开发CRM系统的时候,需要将ERP的订单数据实时的传输到CRM系统中,但是由于每天的订单量特别大,采用实时获取后并存储到数据库中,接口的相应速度较慢,性能较差。经过经过多方位评估采用在数... 查看详情
springboot学习——springboot快速整合rabbitmq
RabbitMQ消息队列@[toc]简介优点erlang开发,并发能力强。社区活跃,使用的人多,稳定性较强。延时低缺点erlang语言开发的,国内精通的不多,日后定制开发困难。RabbitMQ工作模式1,"HelloWorld!"模式简单模式是RabbitMQ最简单入... 查看详情
springboot整合消息队列——rabbitmq
...息路由到bindingkey与routingkey模式匹配的队列中。这里基于springboot整合消息队列,测试这 查看详情
springboot整合rabbitmq之发送接收消息实战
实战前言前几篇文章中,我们介绍了SpringBoot整合RabbitMQ的配置以及实战了Spring的事件驱动模型,这两篇文章对于我们后续实战RabbitMQ其他知识要点将起到奠基的作用的。特别是Spring的事件驱动模型,当我们全篇实战完毕RabbitMQ并... 查看详情
springboot整合rabbitmq
本文序列化和添加package参考:https://www.jianshu.com/p/13fd9ff0648dRabbitMq安装[root@topcheer~]#dockerimagesREPOSITORYTAGIMAGEIDCREATEDSIZEelasticsearchlatest874179f1960311daysago771MBsprin 查看详情