浅谈rocketmq如何保证消息不丢失(代码片段)

默辨 默辨     2022-12-04     181

关键词:

RocketMQ如何保证消息不丢失?

MQ如何避免消息不丢失,这个问题是所有MQ都需要面对的一个共性问题。大致的解决思路都是一致的,但是针对不同的MQ产品又有不同的解决方案。分析这个问题可以从以下几个角度入手。


一、明确丢失消息场景

下图是基础的MQ系统架构。

其中,1,2,4三个场景都是跨网络的,跨网络就肯定会有丢消息的可能。

然后关于3这个环节,通常MQ存盘时都会先写入操作系统的缓存page cache中,然后再由操作系统异步的将消息写入硬盘。这个中间有个时间差,就可能会造成消息丢失。如果服务挂了,缓存中还没有来得及写入硬盘的消息就会丢失。



二、RocketMQ避免消息丢失解决方案

1、消息生产者使用事务消息

RocketMQ的事务消息机制就是为了保证零丢失来设计的,并且经过阿里的验证,肯定是非常靠谱的。

事务消息的实现机制可以参考下图,事务消息正是基于这一套架构设计,继而保证了消息不丢失。



这里有几个问题需要明确

1)为什么要发送half消息?

half消息是在订单系统进行下单操作前发送,并且对下游服务的消费者是不可见的(放入对应的half消息对应的RMQ_SYS_TRANS_HALF_TOPIC队列)。这个消息的作用更多的体现在确认RocketMQ的服务是否正常。相当于嗅探下RocketMQ服务是否正常,并且通知RocketMQ。

如果没有half消息这个流程,那我们通常是会在订单系统中先完成下单,再发送消息给MQ。这时候写入消息到MQ如果失败就会出现数据不一致的问题。而half消息如果写入失败,我们就可以认为MQ的服务是有问题的,这时,就不能通知下游服务了。我们可以在下单时给订单一个状态标记,然后等待MQ服务正常后再进行补偿操作,等MQ服务正常后重新下单通知下游服务。


2)订单系统写数据库失败了怎么办?

这个问题我们同样比较下没有使用事务消息机制时会怎么办?如果没有使用事务消息,我们只能判断下单失败,抛出了异常,那就不往MQ发消息了,这样至少保证不会对下游服务进行错误的通知。但是这样的话,如果过一段时间数据库恢复过来了,这个消息就无法再次发送了。当然,也可以设计另外的补偿机制,例如将订单数据缓存起来,再启动一个线程定时尝试往数据库写。而如果使用事务消息机制,就可以有一种更优雅的方案。

如果下单时,写数据库失败(可能是数据库崩了,需要等一段时间才能恢复)。那我们可以另外找个地方把订单消息先缓存起来(Redis、文本或者其他方式),然后给RocketMQ返回一个UNKNOWN状态。这样RocketMQ就会过一段时间来回查事务状态。我们就可以在回查事务状态时再尝试把订单数据写入数据库,如果数据库这时候已经恢复了,那就能完整正常的下单,再继续后面的业务。这样这个订单的消息就不会因为数据库临时崩了而丢失。


3)half消息写入成功后RocketMQ挂了怎么办?

我们需要注意下,在事务消息的处理机制中,未知状态的事务状态回查是由RocketMQ的Broker主动发起的。也就是说如果出现了这种情况,那RocketMQ就不会回调到事务消息中回查事务状态的服务(第六步)。这时,我们就可以将订单一直标记为"新下单"的状态。而等RocketMQ恢复后,只要存储的消息没有丢失,RocketMQ就会再次继续状态回查的流程。


4)下单成功后如何优雅的等待支付成功?

在订单场景下,通常会要求下单完成后,客户在一定时间内,例如10分钟,内完成订单支付,支付完成后才会通知下游服务进行进一步的营销补偿。

如果不用事务消息,那通常会怎么办?

最简单的方式是启动一个定时任务,每隔一段时间扫描订单表,比对未支付的订单的下单时间,将超过时间的订单回收。这种方式显然是有很大问题的,需要定时扫描很庞大的一个订单信息,这对系统是个不小的压力。

那更进一步的方案是什么呢?是不是就可以使用RocketMQ提供的延迟消息机制。往MQ发一个延迟1分钟的消息,消费到这个消息后去检查订单的支付状态,如果订单已经支付,就往下游发送下单的通知。而如果没有支付,就再发一个延迟1分钟的消息。最终在第十个消息时把订单回收。这个方案就不用对全部的订单表进行扫描,而只需要每次处理一个单独的订单消息。

那如果使用上了事务消息呢?我们就可以用事务消息的状态回查机制来替代定时的任务。在下单时,给Broker返回一个UNKNOWN的未知状态。而在状态回查的方法中去查询订单的支付状态。这样整个业务逻辑就会简单很多。我们只需要配置RocketMQ中的事务消息回查次数(默认15次)和事务回查间隔时间(messageDelayLevel),就可以更优雅的完成这个支付状态检查的需求。


6)事务消息机制的作用

整体来说,在订单这个场景下,消息不丢失的问题实际上就还是转化成了下单这个业务与下游服务的业务的分布式事务一致性问题。而事务一致性问题一直以来都是一个非常复杂的问题。而RocketMQ的事务消息机制,实际上只保证了整个事务消息的一半,他保证的是订单系统下单和发消息这两个事件的事务一致性,而对下游服务的事务并没有保证。但是即便如此,也是分布式事务的一个很好的降级方案。目前来看,也是业内最好的降级方案。



2、配置同步刷盘+Dledger主从架构

1)同步刷盘

刷盘策略指的是broker中消息的落盘方式,即消息发送到broker内存后消息持久化到磁盘的方式。分为 同步刷盘与异步刷盘:

  • 同步刷盘:当消息持久化到broker的磁盘后才算是消息写入成功。

  • 异步刷盘:当消息写入到broker的内存后即表示消息写入成功,无需等待消息持久化到磁盘。异步刷盘策略会降低系统的写入延迟,RT变小,提高了系统的吞吐量 。消息写入到Broker的内存,一般是写入到了PageCache。对于异步刷盘策略,消息会写入到PageCache后立即返回成功ACK。但并不会立即做落盘操 作,而是当PageCache到达一定量时会自动进行落盘

所以,我们可以简单的把RocketMQ的刷盘方式 flushDiskType配置成同步刷盘就可以保证消息在刷盘过程中不会丢失了。


2)Dledger的文件同步

在使用Dledger技术搭建的RocketMQ集群中,Dledger会通过两阶段提交的方式保证文件在主从之间成功同步。

简单来说,数据同步会通过两个阶段,一个是uncommitted阶段,一个是commited阶段。

  1. Leader Broker上的Dledger收到一条数据后,会标记为uncommitted状态,然后他通过自己的DledgerServer组件把这个uncommitted数据发给Follower Broker的DledgerServer组件。
  2. 接着Follower Broker的DledgerServer收到uncommitted消息之后,必须返回一个ack给Leader Broker的Dledger。然后如果Leader Broker收到超过半数的Follower Broker返回的ack之后,就会把消息标记为committed状态。
  3. 最后, Leader Broker上的DledgerServer就会发送committed消息给Follower Broker上的DledgerServer,让他们把消息也标记为committed状态。这样,就基于Raft协议完成了两阶段的数据同步。



3、消息消费者端使用同步消费机制

正常情况下,消费者端都是需要先处理本地事务,然后再给MQ一个ACK响应,这时MQ就会修改Offset,将消息标记为已消费,从而不再往其他消费者推送消息。所以在Broker的这种重新推送机制下,消息是不会在传输过程中丢失的。但是也会有下面这种情况会造成服务端消息丢失。

比如下面的代码示例,接收到MQ的消息后,我们开启一个新的线程处理业务逻辑,并异步返回一个成功标记。这时就会出现业务逻辑处理失败,但是由于成功的消息已经返回,此时就出现了数据不一致的问题。

这种异步消费的方式,就有可能造成消息状态返回后消费者本地业务逻辑处理失败造成消息丢失的可能。所以使用同步消费机制,就能避免该阶段丢消息的情况

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
consumer.registerMessageListener(new MessageListenerConcurrently() 
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                    ConsumeConcurrentlyContext context) 
        new Thread()
            public void run()
                //处理业务逻辑
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
            
        ;
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    
);



4、设计降级方案

NameServer挂了如何保证消息不丢失,这个是RocketMQ特有的问题

NameServer在RocketMQ中,是扮演的一个路由中心的角色,提供到Broker的路由功能。但是其实路由中心这样的功能,在所有的MQ中都是需要的。kafka是用zookeeper和一个作为Controller的Broker一起来提供路由服务,整个功能是相当复杂的。而RabbitMQ是由每一个Broker来提供路由服务。而只有RocketMQ把这个路由中心单独抽取了出来,并独立部署。

NameServer集群中任意多的节点挂掉,都不会影响他提供的路由功能。那如果集群中所有的NameServer节点都挂了呢?


在这种情况下,RocketMQ相当于整个服务都不可用了,那他本身肯定无法给我们保证消息不丢失了。我们只能自己设计一个降级方案来处理这个问题了。例如在订单系统中,如果多次尝试发送RocketMQ不成功,那就只能另外找给地方(Redis、文件或者内存等)把订单消息缓存下来,然后起一个线程定时的扫描这些失败的订单消息,尝试往RocketMQ发送。这样等RocketMQ的服务恢复过来后,就能第一时间把这些消息重新发送出去。整个这套降级的机制,在大型互联网项目中,都是必须要有的。

开发者涨薪指南 48位大咖的思考法则、工作方式、逻辑体系

面试官再问我如何保证rocketmq不丢失消息,这回我笑了!(代码片段)

...?我哭了!』,这篇文章承接这个主题,来聊聊如何保证RocketMQ不丢失消息。0x00.消息的发送流程一条消息从生产到被消费,将会经历三个阶段:生产阶段,Producer新建消息,然后通过网络将消息投递给MQBroker存储阶段,消息将会存... 查看详情

面试官:如何保证rocketmq/rabbitmq消息数据100%不丢失

...网络中,保证消息的可靠性?阿里技术分享了一篇文章:RocketMQ如何保证消息的可靠性?在文中详情介绍了RocketMQ是如何最大限度的保证消息不丢失的呢?分析的思路就是一条消息从产生到最终消费的整个过程,在三个关键的阶段... 查看详情

各种消息队列如何选择?为何选择rocketmq来保证消息不丢失,及应该采用rocketmq哪种通信模式?

前言消息队列本质上来说,是一个符合先进先出原则的单向队列:一方发送消息并存入消息队列尾部(生产者投递消息),一方从消息队列的头部取出消息(消费者消费消息)。 但对于一个成熟可靠的消息队列来说,所需要解决... 查看详情

rocketmq是是如何管理消费进度的?又是如何保证消息成功消费的?(代码片段)

RocketMQ消费者保障消息确认机制consumer的每个实例是靠队列分配来决定如何消费消息的。那么消费进度具体是如何管理的,又是如何保证消息成功消费的?(RocketMQ有保证消息肯定消费成功的特性,失败则重试)什么是ACK消息确认机... 查看详情

回答面试官:如何保证消息不丢失

rocketmq是阿里开源的一个性能很强大的消息队列,很多公司都在用,而且经历了多次双十一的洗礼,支持多种特性对于这个技术点不知道大家掌握的如何了,消息队列现在应该是公司必备的技能之一了,无论是... 查看详情

浅谈消息队列之rocketmq

...何保证消息的顺序性,即保证数据的逻辑正确性简单分析RocketMQ的原理高可用上架构  NameServer:维持心跳和提供Topic-Broker的关系数据,多个Namesrv之间相互没有通信,单台Namesrv宕机不影响其他Namesrv与集群;即使 查看详情

各种消息队列如何选择?为何选择rocketmq来保证消息不丢失,及应该采用rocketmq哪种通信模式?

前言消息队列本质上来说,是一个符合先进先出原则的单向队列:一方发送消息并存入消息队列尾部(生产者投递消息),一方从消息队列的头部取出消息(消费者消费消息)。 但对于一个成熟可靠的消息队列来说,所需要解决... 查看详情

各种消息队列如何选择?为何选择rocketmq来保证消息不丢失,及应该采用rocketmq哪种通信模式?

前言消息队列本质上来说,是一个符合先进先出原则的单向队列:一方发送消息并存入消息队列尾部(生产者投递消息),一方从消息队列的头部取出消息(消费者消费消息)。 但对于一个成熟可靠的消息队列来说,所需要解决... 查看详情

rocketmq保证消息不丢失

...cer消息存储在内存中,然后发生宕机,就会导致消息丢失RocketMQ的持久化消息有两种方式:同步刷盘:Broker收到消息后会在持久化到磁盘完成后才发送ack异步刷盘:Broker收到消息存到内存后返回ack,然后Broker定 查看详情

mqkafka——如何保证消息不丢失?如何解决?(代码片段)

一、前言前一篇博客我们介绍了生产者为什么发送消息的吞吐量这么大,其实就是因为,生产者提供了内存缓冲区,把消息打包再发送,从而提高了吞吐量。那么,消息发送过去,到了broker就算是成功了吗... 查看详情

mqkafka——如何保证消息不丢失?如何解决?(代码片段)

一、前言前一篇博客我们介绍了生产者为什么发送消息的吞吐量这么大,其实就是因为,生产者提供了内存缓冲区,把消息打包再发送,从而提高了吞吐量。那么,消息发送过去,到了broker就算是成功了吗... 查看详情

关于mq的几件小事如何保证消息不丢失(代码片段)

1.mq原则数据不能多,也不能少,不能多是说消息不能重复消费,这个我们上一节已解决;不能少,就是说不能丢失数据。如果mq传递的是非常核心的消息,支撑核心的业务,那么这种场景是一定不能丢失数据的。2.丢失数据场景... 查看详情

互联网面试必杀:如何保证消息中间件全链路数据100%不丢失(代码片段)

...境问题。如果你的简历中写了自己熟悉MQ技术(RabbitMQ、RocketMQ、Kafka),而且在项目里有使用的经验,那么非常实际的一个生产环境问题就是:投递消息到MQ,然后从MQ消费消息来处理的这个过程,数据到底会不会丢失。面试官此... 查看详情

rocketmq源码—rocketmq高可用(代码片段)

高可用究竟指的是什么?请参考:关于高可用的系统RocketMQ做了以下的事情来保证系统的高可用多master部署,防止单点故障消息冗余(主从结构),防止消息丢失故障恢复(本篇暂不讨论)那么问题来了:怎么支持多broker的写?... 查看详情

kafka如何保证数据不丢失?不重复?(代码片段)

...据不丢失下面我们分别从这三个方面来学习,kafka是如何保证数据不丢失的一、produce 查看详情

腾讯二面:引入rabbitmq后,你如何保证全链路数据100%不丢失?(代码片段)

原文:blog.csdn.net/hsz2568952354/article/details/86559470我们都知道,消息从生产端到消费端消费要经过3个步骤:生产端发送消息到RabbitMQ;RabbitMQ发送消息到消费端;消费端消费这条消息;这3个步骤中的每一步都... 查看详情

互联网面试必杀:如何保证消息中间件全链路数据100%不丢失:第四篇(代码片段)

前情提示上篇文章:《互联网面试必杀:如何保证消息中间件全链路数据100%不丢失:第三篇》,我们分析了RabbitMQ开启手动ack机制保证消费端数据不丢失的时候,prefetch机制对消费者的吞吐量以及内存消耗的影响。通过分析,我... 查看详情

互联网面试必杀:如何保证消息中间件全链路数据100%不丢失:第二篇(代码片段)

前情提示上一篇文章《互联网面试必杀:如何保证消息中间件全链路数据100%不丢失:第一篇》,我们初步介绍了之前制定的那些消息中间件数据不丢失的技术方案遗留的问题。一个最大的问题,就是生产者投递出去的消息,可能... 查看详情