springboot(十三):springboot整合rabbitmq

极客挖掘机      2022-05-20     164

关键词:

1. 前言

RabbitMQ 是一个消息队列,说到消息队列,大家可能多多少少有听过,它主要的功能是用来实现应用服务的异步与解耦,同时也能起到削峰填谷、消息分发的作用。

消息队列在比较主要的一个作用是用来做应用服务的解耦,消息从消息的生产者传递到消息队列,消费者从消息队列中获取消息并进行消费,生产者不需要管是谁在消费消息,消费者也无需关注消息是由谁来生产的。在分布式的系统中,消息队列也会被用在其他地方,比如分布式事务的支持,代表如阿里开源的 RocketMQ 。

当然,我们本篇文章的主角还是 RabbitMQ 。

2. RabbitMQ 介绍

RabbitMQ 是实现 AMQP(高级消息队列协议)的消息中间件的一种,最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。 RabbitMQ 主要是为了实现系统之间的双向解耦而实现的。当生产者大量产生数据时,消费者无法快速消费,那么需要一个中间层。保存这个数据。

AMQP,即 Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。AMQP 的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。

RabbitMQ 是一个开源的 AMQP 实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP 等,支持 AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

3. 概念介绍

在普通的消息队列的设计中,一般会有这么几个概念:生产者、消费者和我们的队列。但是在 RabbitMQ 中,中间增加了一层,叫交换机(Exchange),这样,消息的投递就不由生产者来决定投递至哪个队列,而消息是直接投递至交换机的,由交换机根据调度策略来决定这个消息需要投递到哪个队列。如图:

  • 左侧的 P 代表消息的生产者
  • 紫色的 X 代表交换机
  • 右侧红色的代表队列

4. 交换机(Exchange)

那么为什么我们需要 Exchange 而不是直接将消息发送至队列呢?

AMQP 协议中的核心思想就是生产者和消费者的解耦,生产者从不直接将消息发送给队列。生产者通常不知道是否一个消息会被发送到队列中,只是将消息发送到一个交换机。先由 Exchange 来接收,然后 Exchange 按照特定的策略转发到 Queue 进行存储。

Exchange 收到消息时,他是如何知道需要发送至哪些 Queue 呢?这里就需要了解 Binding 和 RoutingKey 的概念:

Binding 表示 Exchange 与 Queue 之间的关系,我们也可以简单的认为队列对该交换机上的消息感兴趣,绑定可以附带一个额外的参数 RoutingKey。Exchange 就是根据这个 RoutingKey 和当前 Exchange 所有绑定的 Binding 做匹配,如果满足匹配,就往 Exchange 所绑定的 Queue 发送消息,这样就解决了我们向 RabbitMQ 发送一次消息,可以分发到不同的 Queue。RoutingKey 的意义依赖于交换机的类型。

下面就来了解一下 Exchange 的三种主要类型:Fanout、Direct 和 Topic。

4.1 Direct Exchange

Direct Exchange 是 RabbitMQ 默认的 Exchange,完全根据 RoutingKey 来路由消息。设置 Exchange 和 Queue 的 Binding 时需指定 RoutingKey(一般为 Queue Name),发消息时也指定一样的 RoutingKey,消息就会被路由到对应的Queue。

4.2 Topic Exchange

Topic Exchange 和 Direct Exchange 类似,也需要通过 RoutingKey 来路由消息,区别在于Direct Exchange 对 RoutingKey 是精确匹配,而 Topic Exchange 支持模糊匹配。分别支持 *# 通配符,* 表示匹配一个单词, # 则表示匹配没有或者多个单词。

4.3 Headers Exchange

Headers Exchange 会忽略 RoutingKey 而根据消息中的 Headers 和创建绑定关系时指定的 Arguments 来匹配决定路由到哪些 Queue。

Headers Exchange 的性能比较差,而且 Direct Exchange 完全可以代替它,所以不建议使用。

4.4 Default Exchange

Default Exchange 是一种特殊的 Direct Exchange。当你手动创建一个队列时,后台会自动将这个队列绑定到一个名称为空的 Direct Exchange 上,绑定 RoutingKey 与队列名称相同。有了这个默认的交换机和绑定,使我们只关心队列这一层即可,这个比较适合做一些简单的应用。

5. Spring Boot 整合 RabbitMQ

Spring Boot 整合 RabbitMQ 非常简单,如果只是简单的使用配置非常少,Spring Boot 提供了 spring-boot-starter-amqp 项目对消息各种支持。

5.1 简单使用

引入依赖

代码清单:spring-boot-rabbitmq/pom.xml


<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置文件 application.yml 如下:

代码清单:spring-boot-rabbitmq/src/main/resources/application.yml


server:
  port: 8080
spring:
  application:
    name: spring-boot-rabbitmq
  rabbitmq:
    host: localhost
    port: 5672
    username: admin
    password: admin

队列配置

代码清单:spring-boot-rabbitmq/src/main/java/com/springboot/springbootrabbitmq/config/QueueConfig.java


@Configuration
public class QueueConfig {
    @Bean
    public Queue simpleQueue() {
        return new Queue("simple");
    }

    @Bean
    public Queue simpleOneToMany() {
        return new Queue("simpleOneToMany");
    }
}

消息提供者

代码清单:spring-boot-rabbitmq/src/main/java/com/springboot/springbootrabbitmq/simple/SimpleSend.java


@Component
public class SimpleSend {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());
  
    @Autowired
    private AmqpTemplate amqpTemplate;

    public void send() {
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        String message = "Hello Spring Boot " + simpleDateFormat.format(new Date());
        amqpTemplate.convertAndSend("simple", message);
        logger.info("消息推送成功!");
    }
}

消息消费者

代码清单:spring-boot-rabbitmq/src/main/java/com/springboot/springbootrabbitmq/simple/SimpleReceive.java


@Component
@RabbitListener(queues = "simple")
public class SimpleReceive {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    @RabbitHandler
    public void process(String message) {
        logger.info("Receive :{}", message);
    }

}

测试

代码清单:spring-boot-rabbitmq/src/test/java/com/springboot/springbootrabbitmq/DemoApplicationTests.java


@RunWith(SpringRunner.class)
@SpringBootTest
public class DemoApplicationTests {

    @Autowired
    SimpleSend simpleSend;

    @Test
    public void simpleSend() {
        simpleSend.send();
    }

}

5.2 一对多使用

如果有一个消息的生产者,有 N 个消息的消费者,会发生什么呢?

对上面的代码稍作改动,增加一个消息的消费者。

测试代码如下:

@Test
public void simpleOneSend() {
    for (int i = 0; i < 100; i ++) {
        simpleManySend.send(i);
    }
}

测试可以看到结果是两个消费者平均的消费了生产者生产的消息。

5.3 多对多使用

我们再增加一个消息的生产者,测试代码如下:

@Test
public void simpleManySend() {
    for (int i = 0; i < 100; i ++) {
        simpleManySend.send(i);
        simpleManySend1.send(i);
    }
}

测试可以看到结果是两个消费者平均的消费了两个生产者生产的消息。

5.4 Topic Exchange

首先还是先配置 Topic ,配置代码如下:

代码清单:spring-boot-rabbitmq/src/main/java/com/springboot/springbootrabbitmq/config/TopicConfig.java


@Configuration
public class TopicConfig {

    private final String message = "topic.message";
    private final String messages = "topic.messages";

    @Bean
    public Queue queueMessage() {
        return new Queue(this.message);
    }

    @Bean
    public Queue queueMessages() {
        return new Queue(this.messages);
    }

    @Bean
    TopicExchange exchange() {
        return new TopicExchange("topicExchange");
    }

    @Bean
    Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
    }

    @Bean
    Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
    }
}

这里队列 queueMessages 可以同时匹配两个 route_key ,而队列 queueMessage 只能匹配 topic.message 。

消息的生产者代码如下:

代码清单:spring-boot-rabbitmq/src/main/java/com/springboot/springbootrabbitmq/topic/TopicSend.java


@Component
public class TopicSend {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send1() {
        String message = "message 1";
        logger.info("send:{}", message);
        rabbitTemplate.convertAndSend("topicExchange", "topic.message", message);
    }

    public void send2() {
        String message = "message 2";
        logger.info("send:{}", message);
        rabbitTemplate.convertAndSend("topicExchange", "topic.messages", message);
    }
}

调用 send1() 消息会由 Exchange 同时转发到两个队列, 而调用 send2() 则只会转发至 receive2 。

5.5 Fanout Exchange

Fanout 就是我们熟悉的广播模式或者订阅模式,给 Fanout 交换机发送消息,绑定了这个交换机的所有队列都收到这个消息。

Fanout 配置如下:

代码清单:spring-boot-rabbitmq/src/main/java/com/springboot/springbootrabbitmq/config/FanoutConfig.java


@Configuration
public class FanoutConfig {
    @Bean
    public Queue MessageA() {
        return new Queue("fanout.A");
    }

    @Bean
    public Queue MessageB() {
        return new Queue("fanout.B");
    }

    @Bean
    public Queue MessageC() {
        return new Queue("fanout.C");
    }

    @Bean
    FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanoutExchange");
    }

    @Bean
    Binding bindingExchangeA(Queue MessageA, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(MessageA).to(fanoutExchange);
    }

    @Bean
    Binding bindingExchangeB(Queue MessageB, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(MessageB).to(fanoutExchange);
    }

    @Bean
    Binding bindingExchangeC(Queue MessageC, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(MessageC).to(fanoutExchange);
    }
}

消息生产者代码如下:

代码清单:spring-boot-rabbitmq/src/main/java/com/springboot/springbootrabbitmq/fanout/FanoutSend.java


@Component
public class FanoutSend {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
        String message = "Hello FanoutSend.";
        logger.info("send:{}", message);
        this.rabbitTemplate.convertAndSend("fanoutExchange","", message);
    }
}

测试代码如下:

代码清单:spring-boot-rabbitmq/src/test/java/com/springboot/springbootrabbitmq/DemoApplicationTests.java


@Test
public void fanoutSend() {
    fanoutSend.send();
}

测试结果为绑定到 fanout 交换机上面的队列都收到了消息。

6. 示例代码

示例代码-Github

示例代码-Gitee

7. 参考

http://www.ityouknow.com/springboot/2016/11/30/spring-boot-rabbitMQ.html

https://blog.csdn.net/y4x5M0nivSrJaY3X92c/article/details/80416996

springboot入门二十三,整合redis

  项目基本配置参考文章SpringBoot入门一,使用myEclipse新建一个SpringBoot项目,使用MyEclipse新建一个SpringBoot项目即可,此示例springboot升级为2.2.1版本。1.pom.xml添加Redis支持<!--5.引入redis依赖--><dependency><groupId>org.springframe... 查看详情

springboot1.5.4整合druid(十三)

上一篇:springboot1.5.4整合mybatis(十二) 1      集成druid连接池springboot集成druid项目mybatis-spring-boot源码地址:https://git.oschina.net/wyait/springboot1.5.4.git 1.1 druid简介D 查看详情

springboot(十三):springboot整合rabbitmq

1.前言RabbitMQ是一个消息队列,说到消息队列,大家可能多多少少有听过,它主要的功能是用来实现应用服务的异步与解耦,同时也能起到削峰填谷、消息分发的作用。消息队列在比较主要的一个作用是用来做应用服务的解耦,... 查看详情

springboot系列(三十三):springboot如何手动连接库并获取指定表结构|超级详细,建议收藏

查看详情

java精选面试题之springboot三十三问

SpringBootSpringBoot是微服务中最好的Java框架.我们建议你能够成为一名SpringBoot的专家.问题一:SpringBoot、SpringMVC和Spring有什么区别?SpringFrameSpringFramework最重要的特征是依赖注入。所有SpringModules不是依赖注入就是IOC控制反转。当我们... 查看详情

(十三)springboot发送e-mail

一:添加mail依赖<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-mail</artifactId></dependency>  二:添加邮件配置打开application.properties#邮箱 查看详情

springboot学习总结(十三)jpa事务

(一)java异常  Throwable这个Java类被用来表示任何可以作为异常被抛出来的类。Throwable对象可分为两种类型(指从Throwable继承而得到的类型):Error用来表示编译时和系统错误(除特殊情况外,一般不用关心);Exception是可以... 查看详情

springboot入门十三,添加rabbitmq

一.概念说明Broker:简单来说就是消息队列服务器实体。Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。Queue:消息队列载体,每个消息都会被投入到一个或多个队列。Binding:绑定,它的作用就是把exchange和queue... 查看详情

springboot(十三)rabbitmq安装与集成

一、前言RabbitMQ是一个开源的消息代理软件(面向消息的中间件),它的核心作用就是创建消息队列,异步接收和发送消息,MQ的全程是:MessageQueue中文的意思是消息队列。##1.1使用场景-削峰填谷:用于应对间歇性流量提升对于... 查看详情

springboot实战(十三)之缓存

什么是缓存?引用下百度百科的解释:缓存就是数据交换的缓冲区(又称作Cache),当某一硬件要读取数据时,会首先从缓存中查找需要的数据,找到了则直接执行,找不到的话则从内存中查找。由于缓存的运行速度比内存快得多,故... 查看详情

基于springboot搭建java项目(二十三)——springboot使用过滤器拦截器和监听器(代码片段)

SpringBoot使用过滤器、拦截器和监听器一、SpringBoot使用过滤器Springboot过滤器的使用(两种方式)使用springboot提供的FilterRegistrationBean注册Filter使用原生servlet注解定义Filter两种方式的本质都是一样的,都是去FilterRegistrat... 查看详情

springboot(十三)cors方式实现跨域

什么是跨域?浏览器从一个域名的网页去请求另一个域名的资源时,域名、端口、协议任一不同,都是跨域。跨域资源访问是经常会遇到的场景,当一个资源从与该资源本身所在的服务器不同的域或端口请求一个资源时,资源便... 查看详情

springboot系列(三十三):springboot如何手动连接库并获取指定表结构|超级详细,建议收藏(代码片段)

👨‍🎓作者:bug菌🎉简介:在CSDN、掘金等社区优质创作者,全网合计6w粉+,对一切技术都感兴趣,重心偏java方向,目前运营公众号[猿圈奇妙屋],欢迎小伙伴们的加入,一起秃头。... 查看详情

springboot系列(三十三):springboot如何手动连接库并获取指定表结构|超级详细,建议收藏(代码片段)

👨‍🎓作者:bug菌🎉简介:在CSDN、掘金等社区优质创作者,全网合计6w粉+,对一切技术都感兴趣,重心偏java方向,目前运营公众号[猿圈奇妙屋],欢迎小伙伴们的加入,一起秃头。... 查看详情

(十三)如何实现事务(代码片段)

前面介绍了SpringBoot中的整合Mybatis并实现增删改查。不清楚的朋友可以看看之前的文章:https://www.cnblogs.com/zhangweizhong/category/1657780.html。SpringBoot整合完Mybatis,有个特别重要的功能之前忘记讲了:那就是SpringBoot如何实现事物控制... 查看详情

(十三)atp应用测试平台——springboot集成kafka案例实战(代码片段)

...:①削峰填谷②异步解耦。本节我们主要介绍一下如何在springboot项目中集成kafka消息中间键,实现简单的数据分发以及消费的案例。正文kafka集群搭建快速搭建一个kafka集群,我们这里以docker 查看详情

springboot+springcloud实现权限管理系统后端篇(二十三):配置中心(configbus)

在线演示演示地址:http://139.196.87.48:9002/kitty用户名:admin密码:admin技术背景如今微服务架构盛行,在分布式系统中,项目日益庞大,子项目日益增多,每个项目都散落着各种配置文件,且随着服务的增加而不断增多。此时,往... 查看详情

java网络商城项目springboot+springcloud+vue网络商城(ssm前后端分离项目)二十三(项目打包和部署)(代码片段)

Java网络商城项目SpringBoot+SpringCloud+Vue网络商城(SSM前后端分离项目)二十三(项目打包和部署)选择要打包的服务1、在这里我们打包商品微服务选择当前项目对应的依赖项目(1)在pom.xml当中引入对... 查看详情