rocketmq的分布式事务机制(事务消息)(代码片段)

刘Java 刘Java     2023-02-03     187

关键词:

详细介绍了RocketMQ的事务消息机制,RocketMQ的事务消息可以用于实现基于可靠消息的最终一致性的分布式事务。

分布式事务常用于保证两个独立的系统之间的数据或者状态的一致性,常见的方案有TCC(Try-Confirm-Cancel),XA两阶段提交方案,可靠消息最终一致性方案,最大努力通知方案等等,最常见的就是最终一致性方案。

使用案例:https://github.com/apache/rocketmq/blob/master/docs/cn/RocketMQ_Example.md#6-%E6%B6%88%E6%81%AF%E4%BA%8B%E5%8A%A1%E6%A0%B7%E4%BE%8B使用TransactionMQProducer作为事务消息生产者,通过sendMessageInTransaction方法发送事务消息,通过设置TransactionListener(executeLocalTransaction执行本地事务、checkLocalTransaction检查本地事务状态)执行回调,可以通过setExecutorService设置自定义线程池来处理这些检查请求。

每个事务消息都有唯一ID,在TransactionListener中,可以通过事务id来使得执行本地事务与检查本地事务这两个操作操作产生联系。这两个方法返回LocalTransactionState.COMMIT_MESSAGE、LocalTransactionState.ROLLBACK_MESSAGE、LocalTransactionState.UNKNOW这三种状态。

事务消息不支持延时消息和批量消息。设置了DelayTimeLevel后,数据事务提交后(或是回查数据库事务完成后),将消息写入目标Topic时,由于DelayTimeLevel的干扰,目标Topic将变成SCHEDULE_TOPIC_XXXX,同时REAL_TOPIC变成RMQ_SYS_TRANS_HALF_TOPIC,真实的Topic在这个环节已经丢失。

1 事务消息简要流程

Apache RocketMQ在4.3.0版中已经支持分布式事务消息,这里RocketMQ采用了2PC两阶段提交的思想来实现了提交事务消息,同时增加一个补偿逻辑来处理二阶段超时或者失败的消息,用以达到消息的最终一致性的目的,并不是强一致性的分布式事务。

下面是事务消息的流程图:

注意,RocketMQ事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败,通过Rocket+MQ的事务消息可以实现可靠消息最终一致性方案的分布式事务。

假设有两个独立部署的系统A、B,A系统提供下单服务,B系统提供扣款服务,下单之后必须扣款,它们的数据库也是独立部署的,这就是一个典型的分布式事务的场景。下面看看RocketMQ的事务消息如何实现最终一致性的分布式事务。

由于是基于2PC的思想,RocketMQ事务消息同样分为两个阶段:Prepared阶段和确认阶段:

  1. Prepared阶段,首先Producer发送一个half message(也称为半消息)给RocketMQ。这个半消息区别于普通消息,即使消息被成功发送到了Broker端,也不会立即可见,需要 Producer对消息的二次确认后,Consumer才可能去消费它。
  2. 随后RocketMQ服务端响应半消息的写入结果,如果是写入成功,那么执行Producer(A系统)的本地事务,如果响应写入失败,此时本地事务逻辑不执行即可。
  3. 确认阶段,Producer(A系统)的本地事务执行可能成功或者失败,Producer将根据结果返回一个COMMIT或者ROLLBACK状态给RocketMQ,RocketMQ收到这个再确认消息之后,对半消息也执行COMMIT或者ROLLBACK,如果是COMMIT,那么半消息对Consumer(B系统)可见(可以消费),如果是ROLLBACK,那么半消息被“回滚”,Consumer永不可见(永不可消费)。

补偿机制:在第二阶段中,如果RocketMQ迟迟收不到Producer的返回结果,即这条半消息的状态一直是pending,则会从服务端发起一次“回查”调用。Producer收到回查消息,检查回查消息对应的本地事务的状态。根据本地事务状态,重新Commit或者Rollback。

可以知道,补偿阶段用于解决确认阶段Producer的消息Commit或者Rollback发生超时或者失败(比如确认阶段Producer挂了)的情况。

这里还有几个问题或者技术点需要深入的了解一下。

2 一阶段半消息不可见的设计

RocketMQ会判断写入的如果事务消息,则RocketMQ对消息的Topic和Queue等属性进行替换,改变主题为RMQ_SYS_TRANS_HALF_TOPIC,同时将原来的Topic和Queue信息存储到消息的属性中。

因为消息主题被替换,故消息并不会转发到该原主题的消息消费队列,并且由于消费组未订阅该替换的主题,故而消费者无法感知消息的存在,不会消费。

随后RocketMQ会开启一个定时任务,从Topic为RMQ_SYS_TRANS_HALF_TOPIC中拉取消息进行消费,根据生产者组获取一个服务提供者发送回查事务状态请求,根据事务状态来决定是提交或回滚消息。

其实改变消息主题是RocketMQ的常用“套路”,RocketMQ的延时消息的实现机制也是这个逻辑,非常的巧妙。

3 二阶段Commit和Rollback操作

在完成一阶段写入一条对用户不可见的消息后,二阶段如果是Commit操作,则需要让消息对用户可见;如果是Rollback则需要撤销一阶段的消息。

对于Rollback,本身一阶段的消息对用户是不可见的,其实不需要真正撤销消息(实际上RocketMQ也无法去真正的删除一条消息,因为是顺序写文件的)。但是为了区别于这条消息没有确定的状态(Pending),需要一个操作来标识这条消息的最终状态。

RocketMQ事务消息方案中引入了Op消息的概念,用Op消息标识事务消息已经确定的状态(Commit或者Rollback)。如果一条事务消息没有对应的Op消息,说明这个事务的状态还无法确定(可能是二阶段失败了)。

引入Op消息后,事务消息无论是Commit还是RollBack都会记录一个Op操作。只不过Commit相对于Rollback只是在写入Op消息前多了一步创建Half消息的索引的过程,即根据Half消息恢复出了以前的普通消息并在内部走了一遍普通的发送的流程,这样消费者就能看到普通消息了。

4 Op消息的设计

RocketMQ将Op消息写入到另一个特定的全局内部Topic中,像Half消息的Topic一样,不会被用户消费。Op消息的内容为对应的Half消息的存储的Offset,这样通过Op消息能索引到Half消息进行后续的回查操作。

5 Commit消息变得可见

在执行二阶段Commit操作时,需要构建出Half消息的索引,让消息变得对生产者可见。一阶段的Half消息由于是写到一个特殊的Topic,所以二阶段构建索引时需要读取出Half消息,并将Topic和Queue替换成真正的目标的Topic和Queue,之后通过一次普通消息的写入操作来生成一条对用户可见的消息。

所以RocketMQ事务消息二阶段其实是利用了一阶段存储的消息的内容,在二阶段时恢复出一条完整的普通消息,然后在内部走一遍消息写入流程即可使得Half消息对客户端可见。

6 消息回查

如果在RocketMQ事务消息的二阶段过程中失败了,例如在做Commit操作时,出现网络问题导致Commit失败,那么需要通过一定的策略使这条消息最终被Commit。RocketMQ采用了一种补偿机制,称为“回查”。

Broker端对未确定状态的消息发起回查,将消息发送到对应的Producer端(同一个Group的Producer),由Producer根据消息来检查本地事务的状态,进而执行Commit或者Rollback。Broker端通过对比Half消息和Op消息进行事务消息的回查。

RocketMQ的Broker端会开启一个定时任务(1分钟一次),从Topic为RMQ_SYS_TRANS_HALF_TOPIC中拉取消息进行消费,通过对比Half消息和Op消息对未确定状态的事务消息发起回查事务状态的请求。

事务消息将在 Broker 配置文件中的参数 transactionTimeout (默认为6s?)这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于 transactionTimeout 参数。

回查时将消息发送到对应的Producer端(同一个Group的Producer),由Producer根据消息来检查本地事务的状态,进而执行Commit或者Rollback。Broker根据返回的事务状态来决定是提交或回滚消息。

多次回查可能都不会成功,因此RocketMQ并不会无休止的的信息事务状态回查,默认回查15次,如果15次回查还是无法得知事务状态,比如Producer客户端崩溃的时候,那么rocketmq默认回滚该消息,此时可能出现本地事务成功而消息回滚的局面,那么事务的最终一致性也将无法保证。

7 最终一致性

RocketMQ的事务消息仅仅保证本地事务和MQ消息发送到消息队列形成原子性,它们才是同一个事务,但不保证消费者是否能一定消费成功。

那消费者消费失败怎么办?因为消费端RocketMQ有重试机制,如果不是代码问题一般重试几次就能成功,这里我们要保证消息消费的幂等性,即多次消费同一个消息对系统的状态没有影响,或者说不会影响最终正确的结果。比如上面的案例中,发生了重复消费,可能就会重复调用多次扣款的接口,我们要保证对同一个消息多次调用和一次调用的最终结果是一致的,而不是调用几次接口就扣款几次。

如果消费者一直执行失败,几乎可以断定就是代码有问题所以才引起的异常。如果多次失败并重试达到一定次数之后,可以先将该异常记录下来,通常是记录到数据库中,后续由人工处理,通过这样来让事务达到最终的一致性。

因此RocketMQ的事务消息不是强一致性的,而是保证最终一致性,并且可能需要人工介入。

目前,生产级别采用的各种分布式事务解决方案也几乎都是最终一致性的。试想一下,如果要保证强一致性的,即必须实时的保证数据的一致性,那么一定需要同步阻塞,此时将会阻塞大量的服务,降低消息分布式系统的可用性和并发度,这是更加不可容忍的。实际上也有强一致性的分布式事务方案,比如基于数据库的2PC实现,但是几乎很少使用,或者说,建议小公司谨慎使用分布式事务,能不用就不用。

与最终一致性对应的业务是,通常在客户进行操作之后,不会立即返回客户成功的信号,而是返回一个“业务正在办理中,成功了会通知你”、“钱款两小时内到账”等友好的延时提醒。

相关文章:

RocketMQ

如有需要交流,或者文章有误,请直接留言。另外希望点赞、收藏、关注,我将不间断更新各种Java学习博客!

rocketmq事务消息机制(代码片段)

RocketMQ提供了事务消息,通过事务消息就能达到分布式事务的最终一致,从而实现了可靠消息服务。一、事务消息的实现步骤事务消息发送步骤:1.发送方将半事务消息发送至RocketMQ服务端。2.RocketMQ服务端将消息持久... 查看详情

rocketmq事务消息入门介绍(代码片段)

...概流程说明白,后续再具体细节进行开篇说明。主题引出分布式事务相关内容。RocketMQ事务消息。RocketMQ事务消息如何使用。RocketMQ事务消息是怎么实现的。为什么需要事务消息会查 查看详情

rocketmq事务消息原理(代码片段)

...RocketMQ在4.3版本之后实现了完整的事务消息,基于MQ的分布式事务方案,本质上是对本地消息表的一个封装,整体流程与本地消息表一致,唯一不同的就是将本地消息表存在了MQ内部,而不是业务数据库,事... 查看详情

搞懂分布式技术19:使用rocketmq事务消息解决分布式事务

搞懂分布式技术19:使用RocketMQ事务消息解决分布式事务初步认识RocketMQ的核心模块rocketmq模块rocketmq-broker:接受生产者发来的消息并存储(通过调用rocketmq-store),消费者从这里取得消息。rocketmq-client:提供发送、接受消息的客... 查看详情

rocketmq(09)——发送事务消息(代码片段)

发送事务消息RocketMQ支持发送事务消息,它的事务消息是基于二阶段提交机制实现的。当发送的消息是事务消息时,只有对应的消息被提交了才能被消费者进行消费。发送事务消息时生产者需要使用TransactionMQProducer,它还需要指... 查看详情

11springboot整合rocketmq实现事务消息(代码片段)

...f0c;在4.x版本之后开源,可以利用事务消息轻松地实现分布式事务。RocketMQ在其消息定义的基础上,对事务消息扩展了两个相关的概念:Half(Prepare)Message——半消息(预处理消息)半消息是一种特殊的消息类型,该状态... 查看详情

rocketmq(09)——发送事务消息(代码片段)

发送事务消息RocketMQ支持发送事务消息,它的事务消息是基于二阶段提交机制实现的。当发送的消息是事务消息时,只有对应的消息被提交了才能被消费者进行消费。发送事务消息时生产者需要使用TransactionMQProducer,... 查看详情

七.rocketmq极简入门-rocketmq事务消息(代码片段)

...库同时提交或回滚,这种夸多个数据库的事务操作叫分布式事务。分布式事物的解决方案有很多,如:2PC,TCC,最终一致性 查看详情

rocketmq解决分布式事务(代码片段)

1.原理图:  2.设计实现思路:1.生产者(发送方)投递事务消息到Broker中,设置该消息为半消息 不可以被消费;2.开始执行我们的本地事务,将本地事务执行的结果(回滚或者提交)发送给Broker;3.Broker获取回滚或者... 查看详情

rocketmq事务消息详解(代码片段)

...析事务消息回查Broker发起事务消息发送流程半消息实现了分布式环境下的数据一致性的处理,生产者发送事务消息的流程如上图所示,通过对源码的学习,我们可以弄清楚下面几点,也是半消息机制的核心:... 查看详情

rocketmq支持事务消息机制(代码片段)

事务消费我们经常支付宝转账余额宝,这是日常生活的一件普通小事,但是我们思考支付宝扣除转账的钱之后,如果系统挂掉怎么办,这时余额宝账户并没有增加相应的金额,数据就会出现不一致状况了。上... 查看详情

rocketmq事务消息

RocketMQ事务消息在实现上充分利用了RocketMQ本身机制,在实现零依赖的基础上,同样实现了高性能、可扩展、全异步等一系列特性。在具体实现上,RocketMQ通过使用HalfTopic以及OperationTopic两个内部队列来存储事务消息推进状态,如... 查看详情

rocketmq源码分析之rocketmq事务消息实现原理中篇----事务消息状态回查(代码片段)

上节已经梳理了RocketMQ发送事务消息的流程(基于二阶段提交),本节将继续深入学习事务状态消息回查,我们知道,第一次提交到消息服务器时消息的主题被替换为RMQ_SYS_TRANS_HALF_TOPIC,本地事务执行完后如果返回本地事务状态... 查看详情

rocketmq源码分析之rocketmq事务消息实现原理中篇----事务消息状态回查(代码片段)

上节已经梳理了RocketMQ发送事务消息的流程(基于二阶段提交),本节将继续深入学习事务状态消息回查,我们知道,第一次提交到消息服务器时消息的主题被替换为RMQ_SYS_TRANS_HALF_TOPIC,本地事务执行完后如果返回本地事务状态... 查看详情

构建基于rocketmq的分布式事务服务(代码片段)

...一篇文章,就来聊一下在软件工程学上的长久的难题——分布式事务(DistributedTransaction)。这个技术也在各个诸如阿里,腾讯等大厂的内部,被广泛地实现,利用及优化。但是由于理论上就有难点,所以分布式事务就隐晦得成... 查看详情

浅谈rocketmq的事务消息

...可以直接列举电商中大家都容易懂的业务场景。比如,在分布式场景中用户取消订单,增加用户账户余额。这个业务简单易懂,业务大致流程是两个服务协同完成业务,订单服务取消订单,账户服务新增用户账户余额。场景有了... 查看详情

rocketmq事务消息篇之事务消息的使用(代码片段)

前言在RocketMQ事务消息篇(一)之事务消息的介绍里对RocketMQ的事务消息作了相关说明,本文提供一些基本的开发示例。java示例依赖<dependencies><dependency><groupId>org.apache.rocketmq</groupId><artifactId>ro 查看详情

rocketmq事务消息篇之事务消息源码分析(代码片段)

前言RocketMQ事务消息篇(一)之事务消息的介绍RocketMQ事务消息篇(二)之事务消息的使用本文继前两篇对事务消息源码进行分析。事务消息处理基本流程在介绍事务消息的时候,画了一个简单的流程图说明事... 查看详情