rocketmq事务消息原理及使用方法解析(代码片段)

小王曾是少年 小王曾是少年     2023-03-30     635

关键词:

🍊 Java学习:Java从入门到精通总结

🍊 深入浅出RocketMQ设计思想:深入浅出RocketMQ设计思想

🍊 绝对不一样的职场干货:大厂最佳实践经验指南


📆 最近更新:2023年3月24日

🍊 个人简介:通信工程本硕 for NJU💪、Java程序员🌕。做过科研paper,发过专利,优秀的程序员不应该只是CRUD

🍊 点赞 👍 收藏 ⭐留言 📝 都是我最大的动力!


文章目录

RocketMQ事务消息

RocketMQ针对事务消息扩展了两个相关的概念:

1. 半消息

半消息(Half Message)是一种特殊的消息类型,处于这个状态的消息暂时不能被Consumer消费。

当一条事务消息被成功投递到Broker上,但Broker没有收到Producer的二次确认时,该事务消息就处于暂时不可消费的状态,这种消息就是半消息


2. 消息状态回查

由于网络抖动、系统宕机等等原因,可能导致ProducerBroker发送的二次确认信息没有送达。如果Broker检测到某条事务消息长时间处于半消息状态,则会主动向Producer端发起回查操作,查询该事务消息在Producer端的事务状态。

这个机制主要是用来解决分布式事务中的超时问题。


上图是RocketMQ官网提供的事务消息流程图,执行步骤如下:

  1. ProducerBroker端发送半消息
  2. Broker发送ACK确认,表示半消息发送成功
  3. Producer执行本地事务
  4. 本地事务完毕,根据事务的状态,ProducerBroker发送二次确认消息,确认该半消息的CommitRollback状态。Broker收到二次确认消息之后:如果是Commit状态,则直接将消息发送到Consumer端执行消费逻辑;如果是Rollback状态,则会直接将其标记为失败,不会发送给Consumer
  5. 针对超时情况,Broker主动向Producer发起消息回查
  6. Producer处理回查消息,返回对应的本地事务执行结果
  7. Broker针对消息回查的结果,执行【步骤4】的操作

适用场景举例

以转账系统为例,假设A要向B转账100元,执行本地事务和发送异步消息的过程应该同时保持成功或失败,即A的账户扣款成功后,就一定要发消息发送出去,最直观的思路可能有两个:

1. 先发消息

这种策略的流程如下:

存在的问题是: 如果消息发送成功,但后续A扣款失败了,消费端仍然会消费这条消息,进而向B账户里打钱,数据就出现不一致的情况了。


2. 后发消息


存在的问题是: 如果扣款成功,但是发送消息失败,就会出现A已经扣钱了,但B账户里没有入账的情况,同样也是无法接受的。

出现上述情况的根本原因是本地事务和发送消息这两个操作并不是原子的,因此也就无法做到同时失败或同时成功,所以数据一致性难以保障。


解决上述问题的方法就是上面提到的半消息

如上图所示,执行本地事务之前先发送一个半消息,此时还不能被消费者消费,只有当本地事务执行完毕并发送二次确认消息之后,半消息才能被Consumer消费。

如此以来就保证了多个系统数据的数据一致性,前提是系统不需要保证数据的强一致性


使用示例

发送事务消息

RocketMQ发送事务消息设计到消息发送、消息回查、消息二次确认等过程,因此这个过程可能会“稍显复杂”

发送事务消息使用的是TransactionMQProducer,一个简单的demo如下:

public class TransactionProducer 
    public static void main(String[] args) throws MQClientException 
        TransactionCheckListener transactionCheckListener = new TransactionCheckListener() 
            @Override
            public LocalTransactionState checkLocalTransactionState(MessageExt msg) 
                return null;
            
        ;

        TransactionMQProducer producer = new TransactionMQProducer("GROUP A");

        producer.setCheckThreadPoolMinSize(2);
        producer.setCheckThreadPoolMaxSize(2);
        producer.setCheckRequestHoldMax(2000);
        producer.setTransactionCheckListener(transactionCheckListener);
        producer.start();

        String[] tags = new String[]"TAG A", "TAG B", "TAG C";
        LocalTransactionExecuter transactionExecuter = new LocalTransactionExecuter() 
            @Override
            public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) 
                return null;
            
        ;
        for (int i = 0; i < 10; i++) 
            Message msg = new Message("TEST", tags[i % tags.length], "KEY " + i, ("HELLO, ROCKETMQ" + i).getBytes());
            SendResult result = producer.sendMessageInTransaction(msg, transactionExecuter, null);
            try 
                Thread.sleep(100);
             catch (InterruptedException e) 
                e.printStackTrace();
            
        
        producer.shutdown();
    


事务回查

checkLocalTransaction是事务消息回查监听方法,可以获取本地事务状态,根据事务的状态来确定是否要发送二次确认消息,或者进行事务回滚操作。

消息回查事务的状态由以下几种情况:

  1. LocalTransactionState.ROLLBACK_MESSAGE:事务回滚
  2. LocalTransactionState.COMMIT_MESSAGE:事务提交
  3. LocalTransactionState.UNKNOW:未知状态,此时Broker会定时重新查询Producer消息的状态,直到出现前面两种情况。
public interface TransactionListener 
    LocalTransactionState checkLocalTransaction(final MessageExt msg);


事务执行

executeLocalTransaction方法用于执行本地事务,如果本地事务执行成功则进行事务提交,否则进行事务回滚,如果是UNKNOW状态的话,Broker就会定时回查Producer的消息状态,直到彻底成功或失败。

public interface TransactionListener 
    LocalTransactionState executeLocalTransaction(final Message msg, final Object arg);


rocketmq源码分析之rocketmq事务消息实现原理中篇----事务消息状态回查(代码片段)

上节已经梳理了RocketMQ发送事务消息的流程(基于二阶段提交),本节将继续深入学习事务状态消息回查,我们知道,第一次提交到消息服务器时消息的主题被替换为RMQ_SYS_TRANS_HALF_TOPIC,本地事务执行完后如果返回本地事务状态... 查看详情

rocketmq源码分析之rocketmq事务消息实现原理中篇----事务消息状态回查(代码片段)

上节已经梳理了RocketMQ发送事务消息的流程(基于二阶段提交),本节将继续深入学习事务状态消息回查,我们知道,第一次提交到消息服务器时消息的主题被替换为RMQ_SYS_TRANS_HALF_TOPIC,本地事务执行完后如果返回本地事务状态... 查看详情

rocketmq事务消息实战(代码片段)

RocketMQ事务消息阅读目录指引:RocketMQ源码分析之从官方示例窥探RocketMQ事务消息实现基本思想RocketMQ源码分析之RocketMQ事务消息实现原理上篇RocketMQ源码分析之RocketMQ事务消息实现原理中篇----事务消息状态回查RocketMQ源码分析... 查看详情

rocketmq原理及解析

一.什么是消息队列RocketMQ是一个纯Java、分布式、队列模型的开源消息中间件,前身是MetaQ,是阿里参考Kafka特点研发的一个队列模型的消息中间件,后开源给apache基金会成为了apache的顶级开源项目,具有高性能、... 查看详情

rocketmq使用事务消息(代码片段)

目录说明原理事务消息处理流程生产端消费端说明事务消息:1、不支持延时消息和批量消息2、如果消息没有及时提交,默认check15次,可以通过Broker的transactionCheckMax参数配置次数。如果超时15次依然没有得到明确结果... 查看详情

rocketmq原理及解析

1如何保证消息的可靠性传输生产者丢失数据:生产者设置同步提交消息,并且手动提交,将消息同步刷盘到从节点后在返回成功;broker:主从复制,同步刷盘;消费端:消费重试,只有返回consu... 查看详情

rocketmq事务机制的底层实现原理解析

...候,为什么消费者就消费不到呢;其本质的一个原因就是RocketMQ一旦发现你发送的是一条half消息,它不会把这个half消息的offset写入OrderPaySuccessTopic中的ConsumeQueue里去,它会把这条half消息写入到自己内部的一个“RMQ_SYS_TRANS_HALF_TOPI... 查看详情

28rocketmq:事务消息机制的底层实现原理

1.half消息在commit前对消费者不可见的原因回顾:当订单系统去发送一个half消息给MQ的时候。对于这个half消息,红包系统在这时是看不到它的,也无法去消费这条消息并进行处理。ConsumeQueue的可见原理场景解析:当... 查看详情

rocketmq使用事务消息(代码片段)

...ate.ROLLBACK_MESSAGE、LocalTransactionState.UNKNOW。原理事务消息是RocketMQ的一大特性,其保证发送消息和执行本地逻辑在同一个事务内。实现的思路借鉴了两阶段提交协议:第一阶段:发送半事务消息,消息发送后,消... 查看详情

rocketmq事务消息篇之事务消息的使用(代码片段)

前言在RocketMQ事务消息篇(一)之事务消息的介绍里对RocketMQ的事务消息作了相关说明,本文提供一些基本的开发示例。java示例依赖<dependencies><dependency><groupId>org.apache.rocketmq</groupId><artifactId>ro 查看详情

rocketmq源码分析之从官方示例窥探:rocketmq事务消息实现基本思想(代码片段)

RocketMQ4.3.0版本开始支持事务消息,后续分享将开始将剖析事务消息的实现原理。首先从官方给出的Demo实例入手,以此通往RocketMQ事务消息的世界中。官方版本未发布之前,从apacherocketmq第一个版本上线后,代码中存在与事务消息... 查看详情

rocketmq事务消息入门介绍(代码片段)

说明周五的时候发了篇:Rocketmq4.3支持事务啦!!!,趁着周末的时候把相关内容看了下,下面的主要内容就是关于RocketMQ事务相关内容介绍了。说明:今天这篇仅仅是入门介绍,并没有涉及到很多细节,先把大概流程说明白,... 查看详情

深入浅出rocketmq原理及实战「底层原理挖掘系列」透彻剖析贯穿rocketmq的消息发送的全部流程和落盘原理分析指南(代码片段)

前言介绍RocketMQ目前在国内应该是比较流行的MQ了,目前本人也在公司的项目中进行使用和研究,借着这个机会,分析一下RocketMQ发送一条消息到存储一条消息的过程,这样会对以后大家分析和研究RocketMQ相关的问... 查看详情

rocketmq(09)——发送事务消息(代码片段)

发送事务消息RocketMQ支持发送事务消息,它的事务消息是基于二阶段提交机制实现的。当发送的消息是事务消息时,只有对应的消息被提交了才能被消费者进行消费。发送事务消息时生产者需要使用TransactionMQProducer,它还需要指... 查看详情

rocketmq实现事务消息方案(代码片段)

RocketMQ是一个来自阿里巴巴的分布式消息中间件,于2012年开源,并在2017年正式成为Apache顶级项目。据了解,包括阿里云上的消息产品以及收购的子公司在内,阿里集团的消息产品全线都运行在RocketMQ之上,并且最近几年的双十一... 查看详情

消息中间件rocketmq源码解析:事务消息

650)this.width=650;"src="http://www.yunai.me/images/common/wechat_mp.jpeg"style="border:2pxsolidrgb(238,238,238);margin-top:0px;"/>关注微信公众号:【芋艿的后端小屋】有福利:RocketMQ/MyCAT/Sharding-JDBC 所有源码分析文章列表Rock 查看详情

rocketmq源码分析4-事务消息实现原理

为什么消息要具备事务能力参见还是比较清晰的。简单的说就是在你业务逻辑过程中,需要发送一条消息给订阅消息的人,但是期望是此逻辑过程完全成功完成之后才能使订阅者收到消息。业务逻辑过程假设是这样的:逻辑部分... 查看详情

rocketmq(09)——发送事务消息(代码片段)

发送事务消息RocketMQ支持发送事务消息,它的事务消息是基于二阶段提交机制实现的。当发送的消息是事务消息时,只有对应的消息被提交了才能被消费者进行消费。发送事务消息时生产者需要使用TransactionMQProducer,... 查看详情