rabbitmq笔记springboot整合rabbitmq之simple容器(消费者)(代码片段)

嘉禾嘉宁papa 嘉禾嘉宁papa     2022-12-16     205

关键词:

一、简介

  消息中间件具有一系列功能如低耦合、可靠投递、广播、流量控制、最终一致性等,成为异步RPC的主要手段之一,常见的ActiveMQ、RabbitMQ、Kafka、RocketMQ等。消息中间件主要作用如下:

  • 异步处理
  • 应用解耦
  • 流量削峰
  • 日志处理

  RabbitMQ默认容器是simple容器,从2.0版本之后就多了一个容器direct容器,我们从分布式架构的角度一起来看看到底有什么不同吧。本文主要用使用Spring Boot(2.5.2)来整合RabbitMQ(2.5.2),使用simple容器实现一个消费者。本文的前提是有一个安装好的RabbitMQ的环境。

1.1 本文中注解说明

  • @Configuration:用于定义配置类,被注解的类可以包含有一个或多个被@Bean注解的方法,这些方法将会被AnnotationConfigApplicationContextAnnotationConfigWebApplicationContext类进行扫描,并用于构建bean定义,初始化Spring容器
  • @RabbitListener:@RabbitListener注解指定目标方法来作为消费消息的方法,通过注解参数指定所监听的队列或者Binding。 @RabbitListener标注在类上面表示当有收到消息的时候,就交给 @RabbitHandler 的方法处理
  • @RabbitHandler: 需配合 @RabbitListener注解一起使用,当收到消息后,根据 MessageConverter 转换后的参数类型调用相关的方法

二、Maven依赖

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.5.2</version>
        <relativePath/>
    </parent>

    <groupId>com.alian</groupId>
    <artifactId>rabbitmq-simple</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>rabbitmq-simple</name>
    <description>SpringBoot整合RabbitMQ之simple容器(消费者)</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
            <version>$parent.version</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <version>$parent.version</version>
        </dependency>

        <!--rabbitMq的版本 版本最好和springboot保持一致-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
            <version>$parent.version</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.68</version>
        </dependency>

        <!--用于序列化-->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.9.10</version>
        </dependency>

        <!--java 8时间序列化-->
        <dependency>
            <groupId>com.fasterxml.jackson.datatype</groupId>
            <artifactId>jackson-datatype-jsr310</artifactId>
            <version>2.9.10</version>
        </dependency>

        <!--自己打包上传到私服的,用于测试-->
        <dependency>
            <groupId>com.alian</groupId>
            <artifactId>common</artifactId>
            <version>0.0.1-SNAPSHOT</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.16</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

  这里需要注意的是下面这个包,是我本人打包到私服的,其实一个员工类,支付类,加上一个常量类,大家也自己打包自己的实体到私服,或者直接通过模块开发的方式实现我这个实例。

<dependency>
    <groupId>com.alian</groupId>
    <artifactId>common</artifactId>
    <version>0.0.1-SNAPSHOT</version>
</dependency>

三、配置类

3.1 基础配置

  本文开始说了,会从分布式架构的角度来完成本次整合。我们会用两个系统消费者和生产者来完成本次的测试,采用这种麻烦的方式,不是不会单元测试或者模块开发,而是为了避免大家踩坑,这样也比较贴近我们实际开发。所以我提前准备了几个简单的类(MQConstants.javaEmployee.javaPayRecord.java)打成一个jar包到maven私服,一个配置类存放MQ常量,一个员工类,一个支付类,里面的属性都是常用类型,具体如下。

MQConstants .java

package com.alian.common.constant;

public class MQConstants 

    /**
     * 交换机
     */
    public final static String ALIAN_EXCHANGE_NAME = "ALIAN_EXCHANGE";

    /**
     * 队列名
     */
    public final static String ALIAN_QUEUE_NAME = "ALIAN_QUEUE";

    /**
     * 路由key
     */
    public final static String ALIAN_ROUTINGKEY_NAME = "ALIAN_ROUTINGKEY";

Employee.java

package com.alian.common.dto;

import java.io.Serializable;
import java.time.LocalDate;
import java.util.Objects;

public class Employee implements Serializable 

    private static final long serialVersionUID = 1L;

    /**
     * 员工编号
     */
    private String id = "";

    /**
     * 员工姓名
     */
    private String name = "";

    /**
     * 员工年龄
     */
    private int age;

    /**
     * 工资
     */
    private double salary = 0.00;

    /**
     * 部门
     */
    private String department = "";

    /**
     * 入职时间
     */
    private LocalDate hireDate = LocalDate.of(1970, 1, 1);

    /**
     * 注意:被序列化对象应提供一个无参的构造函数,否则会抛出异常
     */
    public Employee() 

    

    public String getId() 
        return id;
    

    public void setId(String id) 
        this.id = id;
    

    public String getName() 
        return name;
    

    public void setName(String name) 
        this.name = name;
    

    public int getAge() 
        return age;
    

    public void setAge(int age) 
        this.age = age;
    

    public double getSalary() 
        return salary;
    

    public void setSalary(double salary) 
        this.salary = salary;
    

    public String getDepartment() 
        return department;
    

    public void setDepartment(String department) 
        this.department = department;
    

    public LocalDate getHireDate() 
        return hireDate;
    

    public void setHireDate(LocalDate hireDate) 
        this.hireDate = hireDate;
    

    @Override
    public String toString() 
        return "Employee" +
                "id='" + id + '\\'' +
                ", name='" + name + '\\'' +
                ", age=" + age +
                ", salary=" + salary +
                ", department='" + department + '\\'' +
                ", hireDate=" + hireDate +
                '';
    

    @Override
    public boolean equals(Object o) 
        if (this == o) return true;
        if (!(o instanceof Employee)) return false;
        Employee employee = (Employee) o;
        return getAge() == employee.getAge() &&
                Double.compare(employee.getSalary(), getSalary()) == 0 &&
                Objects.equals(getId(), employee.getId()) &&
                Objects.equals(getName(), employee.getName()) &&
                Objects.equals(getDepartment(), employee.getDepartment()) &&
                Objects.equals(getHireDate(), employee.getHireDate());
    

    @Override
    public int hashCode() 
        return Objects.hash(getId(), getName(), getAge(), getSalary(), getDepartment(), getHireDate());
    

PayRecord.java

package com.alian.common.dto;

import java.io.Serializable;
import java.time.LocalDateTime;
import java.util.Objects;

public class PayRecord implements Serializable 

    private static final long serialVersionUID = 1L;

    /**
     * 支付流水
     */
    private String payTranSeq = "";

    /**
     * 支付金额(单位分)
     */
    private int payAmount;

    /**
     * 支付方式(00:现金,01:微信,02:支付宝,03:银联,04:其他)
     */
    private String payType = "01";

    /**
     * 支付状态(00:支付成功,01:待支付,02:支付失败,03:已取消)
     */
    private String status = "01";

    /**
     * 支付时间
     */
    private LocalDateTime payTime = LocalDateTime.now();

    /**
     * 第三方流水
     */
    private String payNo = "";

    /**
     * 注意:被序列化对象应提供一个无参的构造函数,否则会抛出异常
     */
    public PayRecord() 

    

    public String getPayTranSeq() 
        return payTranSeq;
    

    public void setPayTranSeq(String payTranSeq) 
        this.payTranSeq = payTranSeq;
    

    public int getPayAmount() 
        return payAmount;
    

    public void setPayAmount(int payAmount) 
        this.payAmount = payAmount;
    

    public String getPayType() 
        return payType;
    

    public void setPayType(String payType) 
        this.payType = payType;
    

    public String getStatus() 
        return status;
    

    public void setStatus(String status) 
        this.status = status;
    

    public LocalDateTime getPayTime() 
        return payTime;
    

    public void setPayTime(LocalDateTime payTime) 
        this.payTime = payTime;
    

    public String getPayNo() 
        return payNo;
    

    public void setPayNo(String payNo) 
        this.payNo = payNo;
    

    @Override
    public String toString() 
        return "PayRecord" +
                "payTranSeq='" + payTranSeq + '\\'' +
                ", payAmount=" + payAmount +
                ", payType='" + payType + '\\'' +
                ", status='" + status + '\\'' +
                ", payTime=" + payTime +
                ", payNo='" + payNo + '\\'' +
                '';
    

    @Override
    public boolean equals(Object o) 
        if (this == o) return true;
        if (!(o instanceof PayRecord)) return false;
        PayRecord payRecord = (PayRecord) o;
        return getPayAmount() == payRecord.getPayAmount() &&
                Objects.equals(getPayTranSeq(), payRecord.getPayTranSeq()) &&
                Objects.equals(getPayType(), payRecord.getPayType()) &&
                Objects.equals(getStatus(), payRecord.getStatus()) &&
                Objects.equals(getPayTime(), payRecord.getPayTime()) &&
                Objects.equals(getPayNo(), payRecord.getPayNo());
    

    @Override
    public int hashCode() 
        return Objects.hash(getPayTranSeq(), getPayAmount(), getPayType(), getStatus(), getPayTime(), getPayNo());
    

3.2 交换机、路由、队列配置

  具体的解释,我相信代码里说得很清楚了。com.alian.common.constant.MQConstants是我公共包里的,具体的代码在上面的MQConstants.java

ExchangeConfig.java

package com.alian.rabbitmq.config;

import com.alian.common.constant.MQConstants;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org查看详情  

springboot整合rabbitmq报错

org.springframework.amqp.AmqpAuthenticationException:com.rabbitmq.client.AuthenticationFailureException:ACCESS_REFUSED-LoginwasrefusedusingauthenticationmechanismPLAIN.Fordetailsseethebrokerlogfile.   查看详情

springboot2.x-springboot整合amqp之rabbitmq

文章目录SpringBoot2.X-SpringBoot整合AMQP之RabbitMQRabbitMQ简介引入依赖编写配置编写接口启用Rabbit注解消息监听消息测试SpringBoot2.X-SpringBoot整合AMQP之RabbitMQSpringBoot2整合RabbitMQ案例。RabbitMQ简介简介RabbitMQ是一个由erlang开发的AMQP(AdvanvedMess... 查看详情

springboot整合rabbitmq

1整合RabbitMQ1.1RabbitMQ的相关概念组成部分队列(Queue)声明队列```java@BeanpublicQueueaddUserQueue(){returnnewQueue("demo-user-add");}```交换机(Exchange)用于转发消息,但是它不会做存储,如果没有Queuebind到Exchange的话,它会直接丢弃掉Producer发... 查看详情

springboot整合rabbitmq

记录学习过程从我做起。搞个备份有备无患加入依赖配置yml文件注入RabbitTemplate测试生产者功能消费端实现消费端结果 查看详情

springboot整合rabbitmq转载https://www.cnblogs.com/hlhdidi/p/6535677.html(代码片段)

springboot学习笔记-6springboot整合RabbitMQ一RabbitMQ的介绍    RabbitMQ是消息中间件的一种,消息中间件即分布式系统中完成消息的发送和接收的基础软件.这些软件有很多,包括ActiveMQ(apache公司的),RocketMQ(阿里巴巴公司的,现已经转让... 查看详情

springboot整合rabbitmq(代码片段)

SpringBoot整合RabbitMQ搭建环境创建测试项目:test_rabbitmq_boot添加依赖<?xmlversion="1.0"encoding="UTF-8"?><projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi& 查看详情

spring和springboot整合rabbitmq(代码片段)

Spring和SpringBoot整合RabbitMQ一、Spring整合RabbitMQ1.Producer1.1Config1.1Producer2.Consumer拉取推送消息2.1Config2.2Consumer3.Consumer消息监听(用于推消息)3.1Config3.2MessageListener3.3Consumer二、SpringBoot整合Ra 查看详情

springboot整合rabbitmq(新手整合请勿喷)

整合前先在springboot引入rabbitMqJAR包,版本号可以为自己自定义,本项目是跟随springboot的版本<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></ 查看详情

企业级springboot教程(十五)springboot整合rabbitmq

...务器,并且通过它怎么去发送和接收消息。我将构建一个springboot工程,通过RabbitTemplate去通过MessageListenerAdapter去订阅一个POJO类型的消息。准备工作15minIDEAmaven3.0在开始构建项目之前,机器需要安装rabbitmq,你可以去官网下载,htt... 查看详情

springboot整合rabbitmq入门~~

SpringBoot整合RabbitMQ 入门2020-01-12        创建生产者类,并且在yml配置文件中配置5要素连接MQyml配置文件      spring:      & 查看详情

springboot整合rabbitmq

  当前社区活跃度最好的消息中间件就是kafka和rabbitmq了,前面对kafaka的基础使用做了一些总结,最近开始研究rabbitmq,查看了很多资料,自己仿着写了一些demo,在博客园记录一下。rabbitmq基础知识  关于rabbitmq基础知识,可... 查看详情

[rabbitmq]整合springboot(代码片段)

整合SpringBoot创建项目引入依赖<dependencies><!--RabbitMQ依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>< 查看详情

springboot整合rabbitmq

SpringBoot整合RabbitMQ公司最近在开发CRM系统的时候,需要将ERP的订单数据实时的传输到CRM系统中,但是由于每天的订单量特别大,采用实时获取后并存储到数据库中,接口的相应速度较慢,性能较差。经过经过多方位评估采用在数... 查看详情

springboot学习——springboot快速整合rabbitmq

RabbitMQ消息队列@[toc]简介优点erlang开发,并发能力强。社区活跃,使用的人多,稳定性较强。延时低缺点erlang语言开发的,国内精通的不多,日后定制开发困难。RabbitMQ工作模式1,"HelloWorld!"模式简单模式是RabbitMQ最简单入... 查看详情

springboot整合消息队列——rabbitmq

...息路由到bindingkey与routingkey模式匹配的队列中。这里基于springboot整合​​消息队列​​,测试这 查看详情

springboot整合rabbitmq之发送接收消息实战

实战前言前几篇文章中,我们介绍了SpringBoot整合RabbitMQ的配置以及实战了Spring的事件驱动模型,这两篇文章对于我们后续实战RabbitMQ其他知识要点将起到奠基的作用的。特别是Spring的事件驱动模型,当我们全篇实战完毕RabbitMQ并... 查看详情

springboot整合rabbitmq

    本文序列化和添加package参考:https://www.jianshu.com/p/13fd9ff0648dRabbitMq安装[root@topcheer~]#dockerimagesREPOSITORYTAGIMAGEIDCREATEDSIZEelasticsearchlatest874179f1960311daysago771MBsprin 查看详情