rabbitmq如何保证消息可靠性(代码片段)

HelloWorld搬运工 HelloWorld搬运工     2023-03-11     195

关键词:

个人博客请访问 http://www.x0100.top               

  • 如何保证消息的可靠性?

  • 消息队列如何进行限流?

  • 如何设置延时队列进行延时消费?

1. 📖如何保证消息的可靠性?

先来看看我们的万年老图,从图上我们大概可以看出来一个消息会经历四个节点,只有保证这四个节点的可靠性才能保证整个系统的可靠性。

  • 生产者发出后保证到达了MQ。

  • MQ收到消息保证分发到了消息对应的Exchange。

  • Exchange分发消息入队之后保证消息的持久性。

  • 消费者收到消息之后保证消息的正确消费。

经历了这四个保证,我们才能保证消息的可靠性,从而保证消息不会丢失。

2. 🔍生产者发送消息到MQ失败

我们的生产者发送消息之后可能由于网络闪断等各种原因导致我们的消息并没有发送到MQ之中,但是这个时候我们生产端又不知道我们的消息没有发出去,这就会造成消息的丢失。

为了解决这个问题,RabbitMQ引入了事务机制发送方确认机制(publisher confirm),由于事务机制过于耗费性能所以一般不用,这里我着重讲述发送方确认机制

这个机制很好理解,就是消息发送到MQ那端之后,MQ会回一个确认收到的消息给我们


打开此功能需要配置,接下来我来演示一下配置:

spring:  rabbitmq:    addresses: 127.0.0.1    host: 5672    username: guest    password: guest    virtual-host: /    # 打开消息确认机制    publisher-confirm-type: correlated

我们只需要在配置里面打开消息确认即可。

生产者:

    public void sendAndConfirm()         User user = new User();
        log.info("Message content : " + user);
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());        rabbitTemplate.convertAndSend(Producer.QUEUE_NAME,user,correlationData);        log.info("消息发送完毕。");
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback()            @Override            public void confirm(CorrelationData correlationData, boolean ack, String cause)                 log.info("CorrelationData content : " + correlationData);                log.info("Ack status : " + ack);                log.info("Cause content : " + cause);                if(ack)                    log.info("消息成功发送,订单入库,更改订单状态");                else                    log.info("消息发送失败:"+correlationData+", 出现异常:"+cause);                                    );    

生产者代码里我们看到又多了一个参数:CorrelationData,这个参数是用来做消息的唯一标识,同时我们打开消息确认之后需要对rabbitTemplate多设置一个setConfirmCallback,参数是一个匿名类,我们消息确认成功or失败之后的处理就是写在这个匿名类里面。

比如一条订单消息,当消息确认到达MQ确认之后再行入库或者修改订单的节点状态,如果消息没有成功到达MQ可以进行一次记录或者将订单状态修改。

Tip:消息确认失败不只有消息没发过去会触发,消息发过去但是找不到对应的Exchange,也会触发。

3. 📔MQ接收失败或者路由失败

生产者的发送消息处理好了之后,我们就可以来看看MQ端的处理,MQ可能出现两个问题:

  1. 消息找不到对应的Exchange。

  2. 找到了Exchange但是找不到对应的Queue。

这两种情况都可以用RabbitMQ提供的mandatory参数来解决,它会设置消息投递失败的策略,有两种策略:自动删除或返回到客户端。

我们既然要做可靠性,当然是设置为返回到客户端(true是返回客户端,false是自动删除)。


配置:

spring:  rabbitmq:    addresses: 127.0.0.1    host: 5672    username: guest    password: guest    virtual-host: /    # 打开消息确认机制    publisher-confirm-type: correlated    # 打开消息返回    publisher-returns: true    template:      mandatory: true

我们只需要在配置里面打开消息返回即可,template.mandatory: true这一步不要少~

生产者:

    public void sendAndReturn()         User user = new User();
        log.info("Message content : " + user);
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) ->             log.info("被退回的消息为:", message);            log.info("replyCode:", replyCode);            log.info("replyText:", replyText);            log.info("exchange:", exchange);            log.info("routingKey:", routingKey);        );
        rabbitTemplate.convertAndSend("fail",user);        log.info("消息发送完毕。");    

这里我们可以拿到被退回消息的所有信息,然后再进行处理,比如放到一个新的队列单独处理,路由失败一般都是配置问题了。

4. 📑消息入队之后MQ宕机

到这一步基本都是一些很小概率的问题了,比如MQ突然宕机了或者被关闭了,这种问题就必须要对消息做持久化,以便MQ重新启动之后消息还能重新恢复过来。

消息的持久化要做,但是不能只做消息的持久化,还要做队列的持久化和Exchange的持久化。

    @Bean    public DirectExchange directExchange()         // 三个构造参数:name durable autoDelete        return new DirectExchange("directExchange", false, false);    
    @Bean    public Queue erduo()         // 其三个参数:durable exclusive autoDelete        // 一般只设置一下持久化即可        return new Queue("erduo",true);    

创建Exchange和队列时只要设置好持久化,发送的消息默认就是持久化消息。

设置持久化时一定要将Exchange和队列都设置上持久化:

单单只设置Exchange持久化,重启之后队列会丢失。单单只设置队列的持久化,重启之后Exchange会消失,既而消息也丢失,所以如果不两个一块设置持久化将毫无意义。

Tip: 这些都是MQ宕机引起的问题,如果出现服务器宕机或者磁盘损坏则上面的手段统统无效,必须引入镜像队列,做异地多活来抵御这种不可抗因素。

5. 📌消费者无法正常消费

最后一步会出问题的地方就在消费者端了,不过这个解决问题的方法我们之前的文章已经说过了,就是消费者的消息确认。

spring:  rabbitmq:    addresses: 127.0.0.1    host: 5672    username: guest    password: guest    virtual-host: /    # 手动确认消息    listener:      simple:          acknowledge-mode: manual

打开手动消息确认之后,只要我们这条消息没有成功消费,无论中间是出现消费者宕机还是代码异常,只要连接断开之后这条信息还没有被消费那么这条消息就会被重新放入队列再次被消费。

当然这也可能会出现重复消费的情况,不过在分布式系统中幂等性是一定要做的,所以一般重复消费都会被接口的幂等给拦掉。

所谓幂等性就是:一个操作多次执行产生的结果与一次执行产生的结果一致。

幂等性相关内容不在本章讨论范围~所以我就不多做阐述了。

6. 💡消息可靠性案例

这个图是我很早之前画的,是为了记录当时使用RabbitMQ做消息可靠性的具体做法,这里我正好拿出来做个例子给大家看一看。

这个例子中的消息是先入库的,然后生产者从DB里面拿到数据包装成消息发给MQ,经过消费者消费之后对DB数据的状态进行更改,然后重新入库。

这中间有任何步骤失败,数据的状态都是没有更新的,这时通过一个定时任务不停的去刷库,找到有问题的数据将它重新扔到生产者那里进行重新投递。

这个方案其实和网上的很多方案大同小异,基础的可靠性保证之后,定时任务做一个兜底进行不断的扫描,力图100%可靠性。

rabbitmq高级之如何保证消息发送可靠性(代码片段)

1.RabbitMq的发送机制学过RabbitMq的同学们大概都知道了RabbitMq发送机制引入了Exchange(交换机的概念),消息发送方,首先把消息发送到交换机这是第一个步骤,然后交换机在把消息路由到不同的队列中(Queue)这是第二个步骤,在有不同的消... 查看详情

消息队列专题(高级特性篇):rabbitmq如何保证消息的可靠性投递传输和消费(代码片段)

我们使用RabbitMQ进行消息处理一般都需要保证消息的可靠性,而消息的可靠性又可以根据消息的不同处理阶段分为可靠性投递、传输和消费。本篇博客将针对这三种情况介绍相应的设计方案,首先来看一下消息的可靠性投... 查看详情

rabbitmq(代码片段)

...什么RabbitMQ的特点AMQP是什么?RabbitMQ如何保证消息的可靠性?RabbitMQ如何保证队列的高可用?RabbitMQ使用场景高内聚,低耦合rabbitmq 查看详情

03rabbitmq进阶1之可靠性投递(代码片段)

目录Pt1可靠性投递Pt1.1可靠性风险Pt1.2保证生产者发送消息给Broker(1)事务模式(Transaction)(2)确认模式(Confirm)Pt1.3保证Exchange路由消息到队列(1)消息回发(2)消息路由到备份ExchangePt1.4保证消息在队列存储(1)Queue持久... 查看详情

rabbitmq学习总结(10)——rabbitmq如何保证消息的可靠性

一、丢失场景RabbitMQ丢失的以下3种情况:(1)生产者:生产者发送消息至MQ的数据丢失 查看详情

rabbitmq如何保证消息可靠性不丢失(代码片段)

之前我们简单介绍了rabbitmq的功能。他的作用就是方便我们的消息解耦。紧接着问题就会暴露出来。解耦就设计到双方系统不稳定问题。在mq中有生产者、mq、消费者三个角色。其中一个角色down机或者重启后。就设计到消息的丢... 查看详情

面试官:如何保证rocketmq/rabbitmq消息数据100%不丢失

参考技术A在分布式系统的网络中,保证消息的可靠性?阿里技术分享了一篇文章:RocketMQ如何保证消息的可靠性?在文中详情介绍了RocketMQ是如何最大限度的保证消息不丢失的呢?分析的思路就是一条消息从产生到最终消费的整个... 查看详情

rabbitmq发布订阅持久化(代码片段)

...开发者的声音,稳定、可靠是第一考虑,为了消息传输的可靠性传输,RabbitMQ提供了多种途径的消息持久化保证:Exchange持久化、Queue持久化及Message的持久化。以保证RabbitMQ在退出或Crash等异常情况下,消息不会丢失。RabbitMQ提供... 查看详情

rabbitmq--高级特性(代码片段)

在上一篇文章讲解MQ消息可靠性投递和幂等性中有提到confirm机制的重要性,现在更相信的说明一下一、Confirm机制  Confirm就是消息确认,当Producer发送消息,如果Broker收到消息,会回复一个应答,我们可以以此来确认消息是否... 查看详情

rabbitmq生产方式和解决消息可靠性投递及其他问题(代码片段)

RabbitMQ生产方式和解决消息可靠性投递及其他问题3种Exchange(交换机)与对应生产方式Queue队列重载参数RabbitMQ生产方式Workqueues(工作模式)Routing(路由模式)Publish/Subscribe(订阅模式)Topics(主题模式)消息投递流程考虑以下问题如何保证消... 查看详情

如何保证消息队列的可靠性传输?

参考技术A消息丢失分成三种情况,可能出现生产者、RabbitMQ、消费者。首先要确保写入RabbitMQ的消息别丢,消息队列通过请求确认机制,保证消息的可靠传输。生产开启comfirm模式,在生产者开启comfirm模式之后,每次发送消息都... 查看详情

rabbitmq消息可靠性之确认模式通俗易懂超详细内含案例(代码片段)

RabbitMq保证消息可靠性之确认模式前提完成SpringBoot整合RabbitMq中的Topic通配符模式一、更改Producer工程的application.yml文件spring:rabbitmq:host:localhostport:5672virtual-host:/username:usernamepassword:passwordpublisher-confirm-type:co 查看详情

rabbitmq消息可靠性之回退模式通俗易懂超详细内含案例(代码片段)

RabbitMq保证消息可靠性之回退模式前提完成SpringBoot整合RabbitMq中的Topic通配符模式一、更改Producer工程的application.yml文件spring:rabbitmq:host:localhostport:5672virtual-host:/username:usernamepassword:passwordpublisher-returns:true#开启 查看详情

rabbitmq-confirm发送消息确认深入探讨(代码片段)

...几种工作模式介绍与实践)我们知道,如果要保证消息的可靠性,需要对消息进行持久化处理,然而消息持久化除了需要代码的设置之外,还有一个重要步骤是至关重要的,那就是保证你的消息顺利进入Broker(代理服务器),&nb... 查看详情

rabbitmq消息中间件(代码片段)

...与Kafka等消息队列相比,RabbitMQ最大的优势在于其较高的可靠性:提供确认(ACK)和重传机制保证消息完成消费,消费者异常不会导致消息丢失提供消息持久化机制,broker崩溃不会导致消息丢失集群模式下工作,保证高可用因为具有较高... 查看详情

面试rabbitmq面试题(代码片段)

...是列队为什么使用MQMQ的优点消息队列有什么优缺点?RabbitMQ有什么优缺点?Kafka、ActiveMQ、RabbitMQ、RocketMQ有什么优缺点?MQ有哪些常见问题?如何解决这些问题?什么是RabbitMQ?rabbitmq的使用场景RabbitMQ基本... 查看详情

03rabbitmq进阶1之可靠性投递(代码片段)

目录Pt1可靠性投递Pt1.1可靠性风险Pt1.2保证生产者发送消息给Broker(1)事务模式(Transaction)(2)确认模式(Confirm)Pt1.3保证Exchange路由消息到队列(1)消息回发(2)消息路由到备份ExchangePt1.4保证消息在队列存储(1)Queue持久... 查看详情

03rabbitmq进阶1之可靠性投递(代码片段)

目录Pt1可靠性投递Pt1.1可靠性风险Pt1.2保证生产者发送消息给Broker(1)事务模式(Transaction)(2)确认模式(Confirm)Pt1.3保证Exchange路由消息到队列(1)消息回发(2)消息路由到备份ExchangePt1.4保证消息在队列存储(1)Queue持久... 查看详情