rocketmq事务消息详解(代码片段)

小王曾是少年 小王曾是少年     2023-04-09     176

关键词:

🍊 Java学习:Java从入门到精通总结

🍊 深入浅出RocketMQ设计思想:深入浅出RocketMQ设计思想

🍊 绝对不一样的职场干货:大厂最佳实践经验指南


📆 最近更新:2023年4月9日

🍊 个人简介:通信工程本硕 for NJU💪、Java程序员🌕。做过科研paper,发过专利,优秀的程序员不应该只是CRUD

🍊 点赞 👍 收藏 ⭐留言 📝 都是我最大的动力!


文章目录

事务消息发送流程


半消息实现了分布式环境下的数据一致性的处理,生产者发送事务消息的流程如上图所示,通过对源码的学习,我们可以弄清楚下面几点,也是半消息机制的核心:

  1. 为什么prepare消息不会被Consumer消费?
  2. 事务消息是如何提交和回滚的?
  3. 定时回查本地事务状态的实现细节。

发送事务消息源码分析

发送事务消息方法TransactionMQProducer.sendMessageInTransaction

  • msg:消息
  • tranExecuter:本地事务执行器
  • arg:本地事务执行器参数
public TransactionSendResult sendMessageInTransaction(final Message msg,
        final LocalTransactionExecuter localTransactionExecuter, final Object arg)
        throws MQClientException 
        TransactionListener transactionListener = getCheckListener();
        if (null == localTransactionExecuter && null == transactionListener) 
            throw new MQClientException("tranExecutor is null", null);
        

        // 忽视消息延迟的属性
        if (msg.getDelayTimeLevel() != 0) 
            MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
        

        Validators.checkMessage(msg, this.defaultMQProducer);
		
		// 发送半消息
        SendResult sendResult = null;
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
        try 
            sendResult = this.send(msg);
         catch (Exception e) 
            throw new MQClientException("send message Exception", e);
        
		
		// 处理发送半消息的结果
        LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
        Throwable localException = null;
        switch (sendResult.getSendStatus()) 
        	// 发送半消息成功,执行本地事务逻辑
            case SEND_OK: 
                try 
                    if (sendResult.getTransactionId() != null) 
                        msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
                    
                    String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
                    if (null != transactionId && !"".equals(transactionId)) 
                        msg.setTransactionId(transactionId);
                    
                    // 执行本地事务逻辑
                    if (null != localTransactionExecuter) 
                        localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
                     else if (transactionListener != null) 
                        log.debug("Used new transaction API");
                        localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
                    
                    if (null == localTransactionState) 
                        localTransactionState = LocalTransactionState.UNKNOW;
                    

                    if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) 
                        log.info("executeLocalTransactionBranch return ", localTransactionState);
                        log.info(msg.toString());
                    
                 catch (Throwable e) 
                    log.info("executeLocalTransactionBranch exception", e);
                    log.info(msg.toString());
                    localException = e;
                
            
            break;
            // 发送半消息失败,标记本地事务状态为回滚
            case FLUSH_DISK_TIMEOUT:
            case FLUSH_SLAVE_TIMEOUT:
            case SLAVE_NOT_AVAILABLE:
                localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
                break;
            default:
                break;
        
		
		// 结束事务,设置消息 COMMIT / ROLLBACK
        try 
            this.endTransaction(msg, sendResult, localTransactionState, localException);
         catch (Exception e) 
            log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
        
		
		// 返回事务发送结果
        TransactionSendResult transactionSendResult = new TransactionSendResult();
        transactionSendResult.setSendStatus(sendResult.getSendStatus());
        transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
        
        // 提取Prepared消息的uniqID
        transactionSendResult.setMsgId(sendResult.getMsgId());
        transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
        transactionSendResult.setTransactionId(sendResult.getTransactionId());
        transactionSendResult.setLocalTransactionState(localTransactionState);
        return transactionSendResult;
    

该方法的入参包含有一个需要用户实现本地事务的LocalTransactionExecuter executerexecuter中会进行事务操作以保证本地事务和消息发送这两个操作的原子性。


由上面的源码可知:

Producer会首先发送一个半消息到Broker中:

  1. 半消息发送成功,执行事务
  2. 半消息发送失败,不执行事务

半消息发送到Broker后不会被Consumer消费掉的原因有以下两点:

  1. Broker在将消息写入CommitLog时会判断消息类型,如果是prepare或者rollback消息,ConsumeQueueoffset不变
  2. Broker在构造ConsumeQueue时会判断是否是处于prepare或者rollback状态的消息,如果是则不会将该消息放入ConsumeQueue里,Consumer在拉取消息时也就不会拉取到这条消息

Producer会根据半消息的发送结果和本地任务执行结果来决定如何处理事务(commitrollback),方法最后调用了endTransaction来处理事务的执行结果,源码如下:

  • sendResult:发送半消息的结果
  • localTransactionState:本地事务状态
  • localException:执行本地事务逻辑产生的异常
  • RemotingException:远程调用异常
  • MQBrokerExceptionBroker异常
  • InterruptedException:当线程中断异常
  • UnknownHostException:未知host异常

public void endTransaction(
        final Message msg,
        final SendResult sendResult,
        final LocalTransactionState localTransactionState,
        final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException 
        // 解码消息id
        final MessageId id;
        if (sendResult.getOffsetMsgId() != null) 
            id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
         else 
            id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
        

		// 创建请求
        String transactionId = sendResult.getTransactionId();
        final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
        EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
        requestHeader.setTransactionId(transactionId);
        requestHeader.setCommitLogOffset(id.getOffset());
        switch (localTransactionState) 
            case COMMIT_MESSAGE:
                requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
                break;
            case ROLLBACK_MESSAGE:
                requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
                break;
            case UNKNOW:
                requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
                break;
            default:
                break;
        

        doExecuteEndTransactionHook(msg, sendResult.getMsgId(), brokerAddr, localTransactionState, false);
        requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
        requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
        requestHeader.setMsgId(sendResult.getMsgId());
        String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;

		// 提交 commit / rollback 消息 
        this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
            this.defaultMQProducer.getSendMsgTimeout());
    

该方法是将事务执行的结果发送给Broker,再由Broker决定是否进行消息投递,执行步骤如下:

  1. 收到消息后先检查是否是事务消息,如果不是事务消息则直接返回
  2. 根据请求头里的offset查询半消息,如果查询结果为空则直接返回
  3. 根据半消息构造新消息,新构造的消息会被重新写入到CommitLog里,rollback消息的消息体为空
  4. 如果是rollback消息,则该消息不会被投递

具体原因上文中已经分析过:只有commit消息才会被Broker投递给consumer

RocketMQ会将commit消息和rollback消息都写入到commitLog里,但rollback消息的消息体为空且不会被投递,CommitLog在删除过期消息时才会将其删除。当事务commit成功之后,RocketMQ会重新封装半消息并将其投递给Consumer端消费。


事务消息回查

Broker发起

相较于普通消息,事务消息主要依赖下面三个类:

  1. TransactionStateService:事务状态服务,负责对事务消息进行管理,包括存储和更新事务消息状态、回查状态等
  2. TranStateTable:事务消息状态存储表,基于MappedFileQueue实现
  3. TranRedoLogTranStateTable的日志,每次写入操作都会记录日志,当Broker宕机时,可以利用这个文件做数据恢复

存储半消息到CommitLog时,使用offset索引到对应的TranStateTable的位置


rocketmq详解系列(代码片段)

什么是RocketMQRocketMQ作为一款纯java、分布式、队列模型的开源消息中间件,支持事务消息、顺序消息、批量消息、定时消息、消息回溯等。主要功能是异步解耦和流量削峰:。常见的MQ主要有:ActiveMQ、RabbitMQ、Kafka、Ro... 查看详情

rocketmq事务消息详解(代码片段)

...#x1f34a;Java学习:Java从入门到精通总结🍊深入浅出RocketMQ设计思想:深入浅出RocketMQ设计思想🍊绝对不一样的职场干货:大厂最佳实践经验指南📆最近更新:2023年4月9日🍊个人简介:通信工程本... 查看详情

rocketmq事务消息入门介绍(代码片段)

说明周五的时候发了篇:Rocketmq4.3支持事务啦!!!,趁着周末的时候把相关内容看了下,下面的主要内容就是关于RocketMQ事务相关内容介绍了。说明:今天这篇仅仅是入门介绍,并没有涉及到很多细节,先把大概流程说明白,... 查看详情

rocketmq(09)——发送事务消息(代码片段)

发送事务消息RocketMQ支持发送事务消息,它的事务消息是基于二阶段提交机制实现的。当发送的消息是事务消息时,只有对应的消息被提交了才能被消费者进行消费。发送事务消息时生产者需要使用TransactionMQProducer,它还需要指... 查看详情

rocketmq事务消息篇之事务消息的使用(代码片段)

前言在RocketMQ事务消息篇(一)之事务消息的介绍里对RocketMQ的事务消息作了相关说明,本文提供一些基本的开发示例。java示例依赖<dependencies><dependency><groupId>org.apache.rocketmq</groupId><artifactId>ro 查看详情

rocketmq(09)——发送事务消息(代码片段)

发送事务消息RocketMQ支持发送事务消息,它的事务消息是基于二阶段提交机制实现的。当发送的消息是事务消息时,只有对应的消息被提交了才能被消费者进行消费。发送事务消息时生产者需要使用TransactionMQProducer,... 查看详情

rocketmq事务消息篇之事务消息源码分析(代码片段)

前言RocketMQ事务消息篇(一)之事务消息的介绍RocketMQ事务消息篇(二)之事务消息的使用本文继前两篇对事务消息源码进行分析。事务消息处理基本流程在介绍事务消息的时候,画了一个简单的流程图说明事... 查看详情

rocketmq事务消息原理(代码片段)

一、RocketMQ事务消息原理:        RocketMQ在4.3版本之后实现了完整的事务消息,基于MQ的分布式事务方案,本质上是对本地消息表的一个封装,整体流程与本地消息表一致,唯一不同的就是将本地消息表存... 查看详情

30rocketmq事务消息的代码实现细节(代码片段)

...。1.发送half事务消息出去packagecom.mqTrsMessage;importorg.apache.rocketmq.client.producer.DefaultMQProducer;importorg.apache.rocketmq.clien 查看详情

11springboot整合rocketmq实现事务消息(代码片段)

事务消息是RocketMQ提供的非常重要的一个特性,在4.x版本之后开源,可以利用事务消息轻松地实现分布式事务。RocketMQ在其消息定义的基础上,对事务消息扩展了两个相关的概念:Half(Prepare)Message——半消息(预处理... 查看详情

消息中间件-rocketmq详解(从软件安装到案例实现)(代码片段)

RocketMQ内容一、RocketMQ安装二、RocketMQ作用和结构1.RocketMQ特点2RocketMQ执行流程3.RocketMQ作用3.1消息中间件结构图3.2应用解耦3.3削峰填谷4.rocketmq组成部分5.rocketmq基本概念模型三、生产消息的类型有三种四、消费模式有两种五、延时... 查看详情

rocketmq实现事务消息方案(代码片段)

RocketMQ是一个来自阿里巴巴的分布式消息中间件,于2012年开源,并在2017年正式成为Apache顶级项目。据了解,包括阿里云上的消息产品以及收购的子公司在内,阿里集团的消息产品全线都运行在RocketMQ之上,并且最近几年的双十一... 查看详情

rocketmq使用事务消息(代码片段)

目录说明原理事务消息处理流程生产端消费端说明事务消息:1、不支持延时消息和批量消息2、如果消息没有及时提交,默认check15次,可以通过Broker的transactionCheckMax参数配置次数。如果超时15次依然没有得到明确结果... 查看详情

rocketmq事务消息实战(代码片段)

RocketMQ事务消息阅读目录指引:RocketMQ源码分析之从官方示例窥探RocketMQ事务消息实现基本思想RocketMQ源码分析之RocketMQ事务消息实现原理上篇RocketMQ源码分析之RocketMQ事务消息实现原理中篇----事务消息状态回查RocketMQ源码分析... 查看详情

rocketmq源码分析之从官方示例窥探:rocketmq事务消息实现基本思想(代码片段)

RocketMQ4.3.0版本开始支持事务消息,后续分享将开始将剖析事务消息的实现原理。首先从官方给出的Demo实例入手,以此通往RocketMQ事务消息的世界中。官方版本未发布之前,从apacherocketmq第一个版本上线后,代码中存在与事务消息... 查看详情

rocketmq源码分析之rocketmq事务消息实现原理中篇----事务消息状态回查(代码片段)

上节已经梳理了RocketMQ发送事务消息的流程(基于二阶段提交),本节将继续深入学习事务状态消息回查,我们知道,第一次提交到消息服务器时消息的主题被替换为RMQ_SYS_TRANS_HALF_TOPIC,本地事务执行完后如果返回本地事务状态... 查看详情

rocketmq源码分析之rocketmq事务消息实现原理中篇----事务消息状态回查(代码片段)

上节已经梳理了RocketMQ发送事务消息的流程(基于二阶段提交),本节将继续深入学习事务状态消息回查,我们知道,第一次提交到消息服务器时消息的主题被替换为RMQ_SYS_TRANS_HALF_TOPIC,本地事务执行完后如果返回本地事务状态... 查看详情

rocketmq消息队列——消息存储详解(代码片段)

2016年双11前后阿里巴巴将RocketMQ捐赠给Apache基金会,很快就吸引了全球众多开源爱好者加入其社区生态中,并在2017年9月成为Apache基金会的顶级项目。利用RocketMQ可以轻松实现应用解耦、流量消峰、消息分发等功能,并... 查看详情