springboot-springboot集成rabbitmq

MinggeQingchun      2022-06-10     164

关键词:

一、SpringBoot集成RabbitMQ

创建两个模块,一个命名springboot-send,一个命名springboot-receive

在两个工程的 pom.xml配置文件中引入AMQP依赖

<dependencies>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <!--添加AMQP的起步依赖,添加成功后就会自动引入RabbitMQ的依赖 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
    </dependencies>

核心配置文件application.properties文件中

#配置RabbitMQ相关连接信息(单机版)
spring.rabbitmq.host=192.168.133.129
spring.rabbitmq.port=5672
spring.rabbitmq.username=root
spring.rabbitmq.password=root
#连接超时,单位毫秒,0表示无穷大,不超时
spring.rabbitmq.connection-timeout=0

#配置RabbitMQ相关连接信息(集群版)
#spring.rabbitmq.addresses=192.168.133.129:5672,192.168.133.130:5672
#spring.rabbitmq.username=root
#spring.rabbitmq.password=root

1、direct交换机

消息发送方

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig 

    //配置Direct类型一个队列
    @Bean
    public Queue directQueue()
        return new Queue("bootDirectQueue");
    

    //配置一个Direct类型的交换
    @Bean
    public DirectExchange directExchange()
        return new DirectExchange("bootDirectExchange");
    

    /**
     *  配置Direct类型一个队列和交换机的绑定
     * @param directQueue  需要绑定的队列的对象,参数名必须要与某个@Bean的方法名完全相同(自动进行注入)
     * @param directExchange  需要绑定的交换机的对象,参数名必须要与某个@Bean的方法名完全相同(自动进行注入)
     * @return
     */
    @Bean
    public Binding directBinding(Queue directQueue,DirectExchange directExchange)
        /*
        参数1 需要绑定的队列
        参数2 需要绑定的交换机
        参数3 绑定时的RoutingKey
        * */
        return BindingBuilder.bind(directQueue).to(directExchange).with("bootDirectRoutingKey");
    

@Service("sendService")
public class SendServiceImpl implements SendService 
    //注入Amqp的模板类,利用这个对象来发送和接收消息
    @Autowired
    private AmqpTemplate amqpTemplate;

    /**
     * direct 交换机
     * @param message
     */
    public void sendDirectMessage(String message) 
        /**
         * 发送消息
         * 参数 1 为交换机名
         * 参数 2 为RoutingKey
         * 参数 3 为我们的具体发送的消息数据
         */
        amqpTemplate.convertAndSend("bootDirectExchange","bootDirectRoutingKey",message);
    
 amqpTemplate.convertAndSend("topicExchange","aa.bb.cc",message);
    

消息接收方

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig 

    //配置一个Direct类型的交换
    @Bean
    public DirectExchange directExchange()

        return new DirectExchange("bootDirectExchange");
    
    //配置一个队列
    @Bean
    public Queue directQueue()

        return new Queue("bootDirectQueue");
    

    /**
     * 配置一个队列和交换机的绑定
     * @param directQueue  需要绑定的队列的对象,参数名必须要与某个@Bean的方法名完全相同(自动进行注入)
     * @param directExchange  需要绑定的交换机的对象,参数名必须要与某个@Bean的方法名完全相同(自动进行注入)
     * @return
     */
    @Bean
    public Binding directBinding(Queue directQueue,DirectExchange directExchange)
        /*
        参数1 需要绑定的队列
        参数2 需要绑定的交换机
        参数3 绑定时的RoutingKey
        * */
        return BindingBuilder.bind(directQueue).to(directExchange).with("bootDirectRoutingKey");
    

2、fanout交换机

消息发送方

@Configuration
public class RabbitMQConfig 

    //配置一个 Fanout类型的交换
    @Bean
    public FanoutExchange fanoutExchange()
        return new FanoutExchange("fanoutExchange");
    

@Service("sendService")
public class SendServiceImpl implements SendService 
    //注入Amqp的模板类,利用这个对象来发送和接收消息
    @Autowired
    private AmqpTemplate amqpTemplate;

    /**
     * fanout 交换机
     * @param message
     */
    public void sendFanoutMessage(String message) 
        amqpTemplate.convertAndSend("fanoutExchange","",message);
    

消息接收方

@Configuration
public class RabbitMQConfig 
    //创建一个名字为 fanoutQueue的队列
    @Bean
    public Queue fanoutQueue()
        return new Queue("fanoutQueue");
    
    
    //创建一个名字为 BootFanoutExchange的交换机
    @Bean
    public FanoutExchange fanoutExchange()
        return new FanoutExchange("BootFanoutExchange");
    

    @Bean
    public Binding  fanoutBinding(Queue fanoutQueue,FanoutExchange fanoutExchange)
       //将队列绑定到指定的交换机上
        //参数1 为指定的队列对象
        //参数2 为指定的交换机对象
        return BindingBuilder.bind(fanoutQueue).to(fanoutExchange);
    
@Service("receiveService")
public class ReceiveServiceImpl implements ReceiveService 
    //注入Amqp的模板类,利用这个对象来发送和接收消息
    @Resource
    private AmqpTemplate amqpTemplate;


    @RabbitListener(bindings=
                            @QueueBinding(//@QueueBinding注解要完成队列和交换机的绑定
                                          value = @Queue(),//@Queue创建一个队列(没有指定参数则表示创建一个随机队列)
                                          exchange=@Exchange(name="fanoutExchange",type="fanout")//创建一个交换机
                                          )
                            
                   )
    public void fanoutReceive01(String message)
        System.out.println("fanoutReceive01监听器接收的消息----"+message);
    

    @RabbitListener(bindings=
            @QueueBinding(//@QueueBinding注解要完成队列和交换机的绑定
                    value = @Queue(),//@Queue创建一个队列(没有指定参数则表示创建一个随机队列)
                    exchange=@Exchange(name="fanoutExchange",type="fanout")//创建一个交换机
            )
    
    )
    public void fanoutReceive02(String message)
        System.out.println("fanoutReceive02监听器接收的消息----"+message);
    

3、topic交换机

消息发送方

@Configuration
public class RabbitMQConfig 
    //配置一个 Topic 类型的交换
    @Bean
    public TopicExchange topicExchange()
        return new TopicExchange("topicExchange");
    
@Service("sendService")
public class SendServiceImpl implements SendService 
    //注入Amqp的模板类,利用这个对象来发送和接收消息
    @Autowired
    private AmqpTemplate amqpTemplate;

    /**
     * topic 交换机
     * @param message
     */
    public void sendTopicMessage(String message) 
        amqpTemplate.convertAndSend("topicExchange","aa.bb.cc",message);
    

消息接收方

@Configuration
public class RabbitMQConfig 
    @Bean
    public Queue topicQueue()

        return new Queue("bootTopicQueue");
    

    //创建队列
    @Bean
    public Queue topicQueue2()
        return new Queue("topicQueue2");
    

    @Bean
    public TopicExchange topicExchange()
        return new TopicExchange("bootTopicExchange");
    

    @Bean
    public Binding topicBinding(Queue topicQueue,TopicExchange topicExchange)
        /*
        参数1 需要绑定的队列
        参数2 需要绑定的交换机
        参数3 绑定时的RoutingKey
        * */
        return BindingBuilder.bind(topicQueue).to(topicExchange).with("boot");
    

    @Bean
    public Binding  topicBinding2(Queue topicQueue2,TopicExchange topicExchange)
        //将队列绑定到指定交换机
        //参数1 为指定队列对象
        //参数2 为指定的交换机对象
        //参数3 为RoutingKey的匹配规则,#.test表示 可以接收以任意路径靠头的但是必须以test结尾的队列
        return BindingBuilder.bind(topicQueue2).to(topicExchange).with("#.text");
    
@Service("receiveService")
public class ReceiveServiceImpl implements ReceiveService 
    //注入Amqp的模板类,利用这个对象来发送和接收消息
    @Resource
    private AmqpTemplate amqpTemplate;

    @RabbitListener(bindings = @QueueBinding(value=@Queue("topic01"),key = "aa",exchange =@Exchange(name = "topicExchange",type = "topic")))
    public void  topicReceive01(String message)
        System.out.println("topic01消费者 ---aa---"+message );
    

    @RabbitListener(bindings = @QueueBinding(value=@Queue("topic02"),key = "aa.*",exchange =@Exchange(name = "topicExchange",type = "topic")))
    public void  topicReceive02(String message)
        System.out.println("topic02消费者 ---aa.*---"+message );
    

    @RabbitListener(bindings = @QueueBinding(value=@Queue("topic03"),key = "aa.#",exchange =@Exchange(name = "topicExchange",type = "topic")))
    public void  topicReceive03(String message)
        System.out.println("topic03消费者 ---aa.#---"+message );
    

运行测试Send消息发送,编写Application.java类

@SpringBootApplication
public class RabbitmqSpringbootSendApplication 

    public static void main(String[] args) 
        ApplicationContext applicationContext = SpringApplication.run(RabbitmqSpringbootSendApplication.class, args);

        SendService sendService = (SendService) applicationContext.getBean("sendService");

//        sendService.sendDirectMessage("Boot的direct测试数据");
//        sendService.sendFanoutMessage("Boot的Fanout测试数据");
        sendService.sendTopicMessage("Boot的Topic测试数据,key为aa.bb.cc");
    

 运行测试Receive消息接收,编写Application.java类

@SpringBootApplication
public class RabbitmqSpringbootReceiveApplication 

    public static void main(String[] args) 

        ApplicationContext applicationContext =  SpringApplication.run(RabbitmqSpringbootReceiveApplication.class, args);
        ReceiveService service = (ReceiveService) applicationContext.getBean("receiveService");
        //使用了消息监听器接收消息那么就不需要调用接收方法来接收消息
//        service.receive();
    

springboot-springboot配置说明(代码片段)

端口号server.port=8000WebURLserver.context-path=/config服务器ip地址server.address=设置httpheader大小注意此处tomcat6-7中默认是8192即8k并且每一个连接都会开辟一个8k的cache修改配置一定注意server.max-http-header-size=设置返回http头server 查看详情

java闭关修炼springboot-springboot整合其他框架(代码片段)

【Java闭关修炼】SpringBoot-SpringBoot整合其他框架SpringBoot整合Junit实现步骤Springboot整合redisSpringboot整合mybatis勾选依赖配置datasource数据源信息定义Mapper接口测试UserXmlMapper配置SpringBoot整合Junit实现步骤用于单元测试首先编写一个业务... 查看详情

springboot-springboot入门简介;javaconfig;核心配置application.yml(properties),多环境设置

一、JavaConfig在Spring3.0之前,我们的bean一直通过XML文件来配置的,后来在Spring3.0之后为我们提供了Java的config版本。而且在Spring4.0之后推荐我们使用JavaConfig:是Spring提供的使用java类配置容器。配置SpringIOC容器的纯Java方法Java... 查看详情

1084

#include<iostream>usingnamespacestd;intmain(){ intra; for(intra=1;ra<=50;ra++){  if(4*ra+2*(50-ra)==160)   cout<<50-ra<<""<<ra; }&nbs 查看详情

latex中\ra是啥意思?

没有\\ra这种东西,只有\\r a这表示在a的脑袋顶上增加一个如“。”的空心圆圈符号。也可写做\\ra但是如果写\\r ab或者\\rab的话只有第一个字母上有空心圆圈。另外,如果是\\r中文则会在“中文”前出现一个空格并且空格... 查看详情

markdownobtenereltextocapturadoenunapreguntadeopcionessimplesconotra(代码片段)

查看详情

ra2.r(代码片段)

查看详情

ra-deepcrossing

1.运用场景????Web-scalemodeling。2.创新点????automaticallycombinesfeaturestoproducesuperiorsmodels.????ResidualUnits.3.算法原理3.1整体框架 3.2Deepcrossing????Deepcrossing论文4.算法理解????引入ResNet思想。 查看详情

如何在 Xcode 中播放 RealAudio (.RA) 文件(远程或本地)?

】如何在Xcode中播放RealAudio(.RA)文件(远程或本地)?【英文标题】:HowtoPlayaRealAudio(.RA)File(RemoteorLocal)inXcode?【发布时间】:2012-03-0523:59:42【问题描述】:我需要能够从Xcode播放RealAudio(.RA)文件。如果我不能直接从URL播放文件,我... 查看详情

sqlbuscarporfechaenuncampofechahora在现场日期时间搜索日期(代码片段)

查看详情

Service Fabric:System.IndexOutOfRangeException -“System.RA”报告属性“ReplicaOpenStatus”的警告

】ServiceFabric:System.IndexOutOfRangeException-“System.RA”报告属性“ReplicaOpenStatus”的警告【英文标题】:ServiceFabric:System.IndexOutOfRangeException-\'System.RA\'reportedWarningforproperty\'ReplicaOpenStatus\'【发布时间】:2021-09-2211:09: 查看详情

raing,this,brother,schoolcoat,goose,写出每个单词的3个同类词

...at、these、thosebrother:sister、mother、fatherschool:hospital、library、postofficecoat:cap、hat、dress、skirtgoose:duck、cat、dog(至于,那个“raing”你是不是打错了啊?要么是rain,要么是raining,哪来的raing啊????)望采纳。。。。。。... 查看详情

uva12333大数,字典树

...。#include<bits/stdc++.h>usingnamespacestd;constintNV=10000;constintra=10;intten[4]={1,ra,ra*ra,ra*ra*r 查看详情

动手配置静态路由

环境:共5台服务器A,B,Ra,Rb,Rc,BA,B两台作为客户端R1,R2,R3三台作为路由器目的:通过配置让A与B经过三个路由器的转发,实现A与B的网络互通准备工作:图示规划:让每一段网络都处于不同的物理网络中.主机网卡数接口别名A1A1Ra2Ra1,Ra2Rb2Rb1,... 查看详情

ra生态之外部中断exit

实现:通过按键形式以及灯的亮灭形式进行演示EXIT配置Stacks->NewStack->Input->ExternalIRQDriveronr_icuR_ICU_ExternalIrqOpen()函数:配置用于外部中断接口的IRQ输入引脚,配置如下所示。/*Configuretheexternalinterrupt.*/fsp_err_terr 查看详情

快速入手瑞萨ra系列mcu指南

...标公众号,不错过精彩内容来源| 野火电子关于瑞萨RA系列MCU瑞萨RAMCU基于32位Arm®Cortex®-M高性能处理器,提供强大的嵌入式安全功能、卓越的CoreMark®性能和超低的运行功率,能够满足客户的不同需求并为助力客户创... 查看详情

ra(数论,莫比乌斯反演,整点统计)

题意:求\[\displaystyle\sum_i=1^n\sum_j=1^n[\mathrmlcm(i,j)>n]\pmod10^9+7\].$n\le10^10$.这是我们考试的一道题...考试的时候以为能找出规律,后来发现还是一道数论题qwq而且部分分很不良心啊,只给了\(O(n)\)多的一点分,我\(O(n\lnn)\)根本没活路..还... 查看详情

ra生态之adc采样

ADC配置点击Stacks->NewStack->Analog->ADCr_adc先是单通道ADC默认12位ADC精度,对齐方式扫描(也最常见的方式)下图是设置回调函数adc_callback,和回调中断优先级为2 在PIN那里选择通道R_ADC_Open()函数:开启和初始化ADC模式(在这... 查看详情