springboot实战系列rabbitmq实现消息发送并实现邮箱发送异常监控报警实战(代码片段)

工藤学编程 工藤学编程     2022-11-30     440

关键词:

大家好,我是工藤学编程 🦉一个正在努力学习的小博主,期待你的关注
作业侠系列最新文章😉Java实现聊天程序
SpringBoot实战系列🐷【SpringBoot实战系列】RabbitMQ实现消息发送并实现邮箱发送异常监控报警实战
环境搭建大集合环境搭建大集合(持续更新)

在本栏中,我们之前已经完成了:
【SpringBoot实战系列】之发送短信验证码
【SpringBoot实战系列】之从Async组件应用实战到ThreadPoolTaskExecutor⾃定义线程池
【SpringBoot实战系列】之图形验证码开发并池化Redis6存储
【SpringBoot实战系列】阿里云OSS接入上传图片实战
【SpringBoot实战系列】Sharding-Jdbc实现分库分表到分布式ID生成器Snowflake自定义wrokId实战

本片速览:
1.RabbitMQ交换机类型
2.RabbitMQ配置开发实战
3.对应controller,service开发
4.业务逻辑消费者编写
5.部署rabbitMQ,并在application中配置rabbitMQ
6.异常监控队列配置
7.异常队列消费者代码如下,实现邮箱发送,监控报警
8.邮箱组件代码以及配置
9.测试结果

RabbitMQ交换机类型

  • 简介
    ⽣产者将消息发送到 Exchange,交换器将消息路由到⼀个或者多个队列中,交换机有多个类型,队列和交换机是多对多的
    关系。
    交换机只负责转发消息,不具备存储消息的能⼒,如果没有队列和exchange绑定,或者没有符合的路由规则,则消息会被丢失
    RabbitMQ有四种交换机类型,分别是Direct exchange、Fanout exchange、Topic exchange、Headers exchange,最后的基本不⽤
  • 交换机类型
  1. Direct Exchange 定向
    将⼀个队列绑定到交换机上,要求该消息与⼀个特定的路由键完全匹配
    例⼦:如果⼀个队列绑定到该交换机上要求路由键“aabb”,则只有被标记为“aabb”的消息才被转发,不会转发aabb.cc,也不会转发gg.aabb,只会转发aabb
  2. 处理路由健
    Fanout Exchange ⼴播
    只需要简单的将队列绑定到交换机上,⼀个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像⼦⽹⼴播,每台⼦⽹内的主机都获得了⼀份复制的消息
    Fanout交换机转发消息是最快的,⽤于发布订阅,⼴播形式,中⽂是扇形
    不处理路由健
  • Topic Exchange 通配符
    主题交换机是⼀种发布/订阅的模式,结合了直连交换机与扇形交换机的特点
    将路由键和某模式进⾏匹配。此时队列需要绑定要⼀个模式上
    符号“#”匹配⼀个或多个词,符号 ∗ * 匹配不多不少⼀个词
    例⼦:因此“abc.#”能够匹配到“abc.def.ghi”,但是“abc.*”只会匹配到“abc.def”。

RabbitMQ配置开发实战

@Configuration
@Data
public class RabbitMQConfig 
    /**
     * 交换机
     */
    private String shortLinkEventExchange = "short_link.event.exchange";

    /**
     * 创建交换机 Topic类型
     * ⼀般⼀个微服务⼀个交换机
     *
     * @return
     */
    @Bean
    public Exchange shortLinkEventExchange() 
        return new TopicExchange(shortLinkEventExchange, true, false);
        //return newFanoutExchange(shortLinkEventExchange,true,false);
    
   
    /**
     * 新增短链 队列
     */
    private String shortLinkAddLinkQueue = "short_link.add.link.queue";
    /**
     * 新增短链映射 队列
     */
    private String shortLinkAddMappingQueue = "short_link.add.mapping.queue";
    /**
     * 新增短链具体的routingKey,【发送消息使⽤】
     */
    private String shortLinkAddRoutingKey = "short_link.add.link.mapping.routing.key";
    /**
     * topic类型的binding key,⽤于绑定队列和交换机,是⽤
     * 于 link 消费者
     */
    private String shortLinkAddLinkBindingKey = "short_link.add.link.*.routing.key";
    /**
     * topic类型的binding key,⽤于绑定队列和交换机,是⽤
     * 于 mapping 消费者
     */
    private String shortLinkAddMappingBindingKey = "short_link.add.*.mapping.routing.key";

    /**
     * 新增短链api队列和交换机的绑定关系建⽴
     */
    @Bean
    public Binding shortLinkAddApiBinding() 
        return new Binding(shortLinkAddLinkQueue, Binding.DestinationType.QUEUE, shortLinkEventExchange, shortLinkAddLinkBindingKey, null);
    

    /**
     * 新增短链mapping队列和交换机的绑定关系建⽴
     */
    @Bean
    public Binding shortLinkAddMappingBinding() 
        return new Binding(shortLinkAddMappingQueue, Binding.DestinationType.QUEUE, shortLinkEventExchange, shortLinkAddMappingBindingKey, null);
    

    /**
     * 新增短链api 普通队列,⽤于被监听
     */
    @Bean
    public Queue shortLinkAddLinkQueue() 
        return new Queue(shortLinkAddLinkQueue, true, false, false);
    

    /**
     * 新增短链mapping 普通队列,⽤于被监听
     */
    @Bean
    public Queue shortLinkAddMappingQueue() 
        return new Queue(shortLinkAddMappingQueue, true, false, false);
    

你要发送的消息,用一个类封装起来即可

对应controller,service开发

@PostMapping("/add")
    public JsonData createShortLink(@RequestBody ShortLinkAddRequest shortLinkAddRequest) 
        JsonData jsonData = shortLinkService.createShortLink(shortLinkAddRequest);
        return jsonData;
    

对应service,注入配置好的RabbitConfig以及rabbitTemplate即可

  @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private RabbitMQConfig rabbitMQConfig;

发送代码

 @Override
 public JsonData createShortLink(ShortLinkAddRequest shortLinkAddRequest) 


        /**
         * 使用lombok建造者模式构建你要发送的消息,然后调用rabbitTemplate.convertAndSend方法,配置对应的交换机,路由key与消息即可
         */
        Long account_no = LoginInterceptor.threadLocal.get().getAccountNo();

        EventMessage eventMessage = EventMessage.builder().accountNo(account_no).messageId(IDUtil.geneSnowFlakeID().toString()).content(JsonUtil.obj2Json(shortLinkAddRequest)).eventMessageType(EventMessageType.SHORT_LINK_ADD.name()).build();

        rabbitTemplate.convertAndSend(rabbitMQConfig.getShortLinkEventExchange(),rabbitMQConfig.getShortLinkAddRoutingKey(),eventMessage);

        return JsonData.buildSuccess();
    

消费者编写

@RabbitListener(queuesToDeclare = @Queue("short_link.add.link.queue"))
@Slf4j
@Component
public class ShortLinkAddLinkMQListener 

    @RabbitHandler
    public void shortLinkHandler(EventMessage eventMessage, Message message, Channel channel)throws Exception
        log.info("监听到消息 ShortLinkAddLinkMQListener:message消息内容:", message);
        try 
            int i=1/0;
            //TODO 处理业务
         catch (Exception e) 
            // 处理业务失败,还要进⾏其他操作,⽐如记录失败原因
            log.error("消费失败", eventMessage);
            throw new Exception(BizCodeEnum.MQ_CONSUME_EXCEPTION.getMessage());
        
        log.info("消费成功", eventMessage);
        //手动确认消息消费成功
       // channel.basicAck(msgTag, false);
    



application中配置rabbitMQ,部署可见环境搭建大集合(持续更新)

##----------rabbit配置--------------
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
#需要⼿⼯创建虚拟主机
spring.rabbitmq.virtual-host=dev
spring.rabbitmq.username=$admin
spring.rabbitmq.password=$password
#消息确认⽅式,manual(⼿动ack) 和auto(⾃动ack)
spring.rabbitmq.listener.simple.acknowledgemode=auto
#开启重试,消费者代码不能添加try catch捕获不往外抛异常
spring.rabbitmq.listener.simple.retry.enabled=true
#最⼤重试次数
spring.rabbitmq.listener.simple.retry.max-attempts=4
# 重试消息的时间间隔,5秒
spring.rabbitmq.listener.simple.retry.initial-interval=5000

将配置中host,port,username,password改为你自己的即可,并且记得去RabbitMQ可视化控制台创建一个名为dev的host,具体操作如下:

注意:

加了@bean配置交换机和queue,启动项⽬却没⾃动化创建队列
RabbitMQ懒加载模式, 需要配置消费者监听才会创建,
@RabbitListener(queues =“short_link.add.link.queue”)
另外种⽅式(若Mq中⽆相应名称的队列,也会⾃动创建Queue)
@RabbitListener(queuesToDeclare = @Queue(“short_link.add.link.queue”) )

因为我们消费者代码逻辑中有1/0,用于模拟业务过程出错,这样即实战了消息发送也实现了异常监控

异常监控队列配置

@Configuration
@Data
public class RabbitMQErrorConfig 

    private String shortLinkErrorExchange = "short_link.error.exchange";
    private String shortLinkErrorQueue = "short_link.error.queue";
    private String shortLinkErrorRoutingKey = "short_link.error.routing.key";
    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 异常交换机
     *
     * @return
     */
    @Bean
    public TopicExchange errorTopicExchange() 
        return new TopicExchange(shortLinkErrorExchange, true, false);
    

    /**
     * 异常队列
     *
     * @return
     */
    @Bean
    public Queue errorQueue() 
        return new Queue(shortLinkErrorQueue, true);
    

    /**
     * 队列与交换机进⾏绑定
     *
     * @return
     */
    @Bean
    public Binding BindingErrorQueueAndExchange(Queue errorQueue, TopicExchange errorTopicExchange) 
        return BindingBuilder.bind(errorQueue).to(errorTopicExchange).with(shortLinkErrorRoutingKey);
    

    /**
     * 配置 RepublishMessageRecoverer
     * ⽤途:消息重试⼀定次数后,⽤特定的routingKey转发到指
     * 定的交换机中,⽅便后续排查和告警
     * <p>
     * 顶层是 MessageRecoverer接⼝,多个实现类
     *
     * @return
     */
    @Bean
    public MessageRecoverer messageRecoverer() 
        return new RepublishMessageRecoverer(rabbitTemplate, shortLinkErrorExchange, shortLinkErrorRoutingKey);
    

对应消费者代码如下,实现邮箱发送,监控报警

@RabbitListener(queuesToDeclare = @Queue("short_link.error.queue"))
@Slf4j
@Component
public class ShortLinkErrorMQListener 

    public static final String SUBJECT = "短链监控告警";
    public static final String CONTENT = "用户%s短链创建%s,消息消费出现异常";

    @Autowired
    private ErrorNotifyComponent errorNotifyComponent;
    @RabbitHandler
    public void shortLinkHandler(EventMessage eventMessage, Message message, Channel channel)throws Exception
        log.info("监听到消息 ShortLinkAddLinkMQListener:message消息内容:", message);
        try 

            errorNotifyComponent.sendMail("110@qq.com",SUBJECT,String.format(CONTENT,eventMessage.getAccountNo(),eventMessage.getContent()));
            log.info("发送成功");
            //TODO 处理业务
         catch (Exception e) 
            // 处理业务失败,还要进⾏其他操作,⽐如记录失败原因
            log.error("消费失败", eventMessage);
            throw new Exception(BizCodeEnum.MQ_CONSUME_EXCEPTION.getMessage());
        
        log.info("消费成功", eventMessage);
        //确认消息消费成功
       // channel.basicAck(msgTag, false);
    



ErrorNotifyComponent实现邮箱发送代码及配置,发送端使⽤⽹易邮箱https://mail.126.com/

具体如何获得授权码,请见博客网易邮箱获取授权码

application中添加配置如下

spring.mail.host=smtp.126.com
spring.mail.username=$你注册的网易邮箱账号
spring.mail.password=$得到的授权码
spring.mail.from=$你注册的网易邮箱账号
spring.mail.properties.mail.smtp.starttls.enable=true
spring.mail.properties.mail.smtp.starttls.required=true
spring.mail.properties.mail.smtp.ssl.enable=true
spring.mail.default-encoding=utf-8

邮箱组件代码

@Component
@Slf4j
public class ErrorNotifyComponent 
    @Autowired
    private JavaMailSender mailSender;

    @Value("$spring.mail.from")
    private String from;
    @Async
    public void sendMail(String to, String subject, String content) 
        SimpleMailMessage message = new SimpleMailMessage();

        message.setFrom(from);

        message.setTo(to);

        message.setSubject(subject);

        message.setText(content);

        mailSender.send(message);

        log.info("邮件发送成功:",message.toString());
    

使用postman测试后,发现重试之后报了错,但是进入了异常队列并成功发送邮件

springboot系列——7步集成rabbitmq

...帮助我们实现异步、削峰的目的。今天这篇,我们来看看SpringBoot是如何集成RabbitMQ,发送消息和消费消息的。同时我们介绍下死信队列。集成RabbitMQ集成RabbitMQ只需要如下几步即可1、添加maven依赖<!--rabbitmq--><dependency>??<... 查看详情

springboot(二十二)集成rabbitmq---mq实战演练

RabbitMQ是一个在AMQP基础上完成的,可复用的企业消息系统。他遵循MozillaPublicLicense开源协议。RabbitMQ是流行的开源消息队列系统,用erlang语言开发。RabbitMQ是AMQP(高级消息队列协议)的标准实现。消息中间件的工作过程可以用生... 查看详情

springboot整合rabbitmq之发送接收消息实战

实战前言前几篇文章中,我们介绍了SpringBoot整合RabbitMQ的配置以及实战了Spring的事件驱动模型,这两篇文章对于我们后续实战RabbitMQ其他知识要点将起到奠基的作用的。特别是Spring的事件驱动模型,当我们全篇实战完毕RabbitMQ并... 查看详情

rabbitmq实战:可用性分析和实现

本系列是「RabbitMQ实战:高效部署分布式消息队列」书籍的总结笔记。上一篇介绍了各种场景下的最佳实践,大部分场景可以使用「发后即忘」的模式,不需要响应,如果需要响应,可以使用RabbitMQ的RPC模型。RabbitMQ以异步的方式... 查看详情

springboot结合rabbitmq与redis实现商品的并发下单springboot系列12(代码片段)

...ringBooot项目开发与SpringCloud微服务系列项目开发1项目准备SpringBoot整合RabbitMQ消息队列【SpringBoot系列11】本文章基于这个项目来开发本文章是系列文章,每节文章都有对应的代码,每节的源码都是在上一节的基础上配置而... 查看详情

springboot实战之rabbitmq

什么是RabbitMQ?RabbitMQ是一个消息代理。它的核心原理非常简单:接收和发送消息。你可以把它想像成一个邮局:你把信件放入邮箱,邮递员就会把信件投递到你的收件人处。在这个比喻中,RabbitMQ就扮演着邮箱、邮局以及邮递员... 查看详情

springboot之rabbitmq

RabbitMQ即一个消息队列,主要是用来实现应用程序的异步和解耦,同时也能起到消息缓冲,消息分发的作用。消息中间件在互联网公司的使用中越来越多,刚才还看到新闻阿里将RocketMQ捐献给了Apache,当然了今天的主角还是讲Rabbi... 查看详情

springboot整合rabbitmq之典型应用场景实战二

实战前言RabbitMQ作为目前应用相当广泛的消息中间件,在企业级应用、微服务应用中充当着重要的角色。特别是在一些典型的应用场景以及业务模块中具有重要的作用,比如业务服务模块解耦、异步通信、高并发限流、超时业务... 查看详情

springboot整合rabbitmq之典型应用场景实战三

实战前言RabbitMQ作为目前应用相当广泛的消息中间件,在企业级应用、微服务应用中充当着重要的角色。特别是在一些典型的应用场景以及业务模块中具有重要的作用,比如业务服务模块解耦、异步通信、高并发限流、超时业务... 查看详情

springboot系列-springboot整合rabbitmq

一RabbitMQ的介绍    RabbitMQ是消息中间件的一种,消息中间件即分布式系统中完成消息的发送和接收的基础软件.这些软件有很多,包括ActiveMQ(apache公司的),RocketMQ(阿里巴巴公司的,现已经转让给apache).  消息中间件的工作过程可... 查看详情

六.rabbitmq消息队列的基础+实战

六.RabbitMQ消息队列的基础+实战1.消息队列简介2.消息队列的使用流程3.windows下的安装RabbitMQ4.rabbitmq和springboot结合5.rabbitmq结合springboot的重要类未完待续六.RabbitMQ消息队列的基础+实战1.消息队列简介①消息队列概 查看详情

springboot整合rabbitmq之典型应用场景实战一

实战前言RabbitMQ作为目前应用相当广泛的消息中间件,在企业级应用、微服务应用中充当着重要的角色。特别是在一些典型的应用场景以及业务模块中具有重要的作用,比如业务服务模块解耦、异步通信、高并发限流、超时业务... 查看详情

javaspringboot集成rabbitmq实战和总结(代码片段)

...不错。因为书中讲的都是Python和Php的例子,所以自己结合SpringBoot文档和朱小厮的博客做了一些总结,写了一些Springboot的例子。交换器、队列、绑定的声明SpringAMQP项目对RabbitMQ做了很好的封装,可以很方便的手动声明队列,交换... 查看详情

rabbitmq一文彻底弄懂rabbitmq的四种交换机原理及springboot实战应用(代码片段)

...原理及实战应用交换机概念direct直连交换机工作模式图解springboot代码Fanout扇出交换机工作模式图解springboot代码Topic主题交换机工作模式图解springboot代码header交换机交换机概念交换机可以理解成具有路由表的路由程序,仅此... 查看详情

玩转rabbitmq系列02:rabbitmq保姆级安装教程与基本消息模型实战(代码片段)

...向:java后端开发🎁我的上一篇文章:【玩转Rabbitmq系列】01:一文带你敲响Rabbitmq的大门💕如果我的文章对你有帮助,点赞、收藏、留言都是对我最大的动力【玩转Rabbitmq系列】文章直通车~【玩转Rabbitmq系列】01... 查看详情

rabbitmq实战:理解消息通信(代码片段)

本系列是「RabbitMQ实战:高效部署分布式消息队列」书籍的总结笔记。前段时间总结完了「深入浅出MyBatis」系列,对MyBatis有了更全面和深入的了解,在掘金社区也收到了一些博友的喜欢,很高兴。另外,短暂的陪产假就要结束... 查看详情

springboot整合rabbitmq之整合配置篇

实战背景:RabbitMQ实战第一阶段-RabbitMQ的官网拜读已经结束了,相信诸位童鞋或多或少都能入了个门,如果还是觉得迷迷糊糊似懂非懂的,那我建议诸位可以亲自去拜读拜读官网的技术手册或者看多几篇我的视频跟源码!因为接... 查看详情

springboot整合rabbitmq:5种模式实战(代码片段)

一、环境准备1、pom依赖<!--父工程依赖--><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.3.6.RELEASE</vers 查看详情