rocketmq事务消息原理(代码片段)

张维鹏 张维鹏     2022-12-12     417

关键词:

一、RocketMQ事务消息原理:

        RocketMQ 在 4.3 版本之后实现了完整的事务消息,基于MQ的分布式事务方案,本质上是对本地消息表的一个封装,整体流程与本地消息表一致,唯一不同的就是将本地消息表存在了MQ内部,而不是业务数据库,事务消息解决的是生产端的消息发送与本地事务执行的原子性问题,这里的界限一定要清楚,是确保 MQ 生产端正确无误地将消息发送出来,没有多发,也不会漏发,至于发送后消费端有没有正常的消费消息,这种异常场景将由 MQ 消息消费失败重试机制来保证。

        RocketMQ 设计中的 broker 与 producer 端的双向通信能力,使得 broker 天生可以作为一个事务协调者;而 RocketMQ 本身提供的存储机制则为事务消息提供了持久化能力;RocketMQ 的高可用机制以及可靠消息设计则为事务消息在系统发生异常时依然能够保证达成事务的最终一致性。

1、RocketMQ 实现事务一致性的原理:

备注:本地事务的回滚依赖于本地DB的ACID特性,订阅方的成功消费由 MQ Server 的失败重试机制进行保证。

(1)正常情况:在事务主动方服务正常,没有发生故障的情况下,发消息流程如下:

步骤①:MQ 发送方向 MQ Server 发送 half 消息,MQ Server 标记消息状态为 prepared,此时该消息 MQ 订阅方是无法消费到的

步骤②:MQ Server 将消息持久化成功之后,向发送方 ACK 确认消息已经成功接收

步骤③:发送方开始执行本地事务逻辑

步骤④:发送方根据本地事务执行结果向 MQ Server 提交二次确认,commit 或 rollback

最终步骤:MQ Server 如果收到的是 commit 操作,则将半消息标记为可投递,MQ订阅方最终将收到该消息;若收到的是 rollback 操作则删除 half 半消息,订阅方将不会接受该消息;如果本地事务执行结果没响应或者超时,则 MQ Server 回查事务状态,具体见步骤(2)的异常情况说明。

(2)异常情况:在断网或者应用重启等异常情况下,图中的步骤④提交的二次确认超时未到达 MQ Server,此时的处理逻辑如下:

步骤⑤:MQ Server 对该消息进行消息回查

步骤⑥:发送方收到消息回查后,检查该消息的本地事务执行结果

步骤⑦:发送方根据检查得到的本地事务的最终状态再次提交二次确认。

最终步骤:MQ Server基于 commit/rollback 对消息进行投递或者删除

2、RocketMQ事务消息的实现流程:

        以 RocketMQ 4.5.2 版本为例,事务消息有专门的一个队列 RMQ_SYS_TRANS_HALF_TOPIC,所有的 prepare 消息都先往这里放,当消息收到 Commit 请求后,就将消息转移到真实的 Topic 队列里,供 Consumer 消费,同时向 RMQ_SYS_TRANS_OP_HALF_TOPIC 塞一条消息。简易流程图如下:

        当应用模块的事务因为中断或者其他的网络原因导致无法立即响应的,RocketMQ 会当做 UNKNOW 处理,对此 RocketMQ 事务消息提供了一个补救方案:定时回查事务消息的事务执行状态,简易流程图如下:

二、Springboot 整合 RocketMQ 实现事务消息:

        该部分将从 "下订单 + 扣减库存"的案例来介绍 SpringBoot 如何整合 RocketMQ 并使用事务消息保证最终一致性。核心思路是订单服务(生产端)向 RocketMQ 发送库存扣减消息,再执行本地订单生成逻辑,最后交由 RocketMQ 通知 库存服务扣减库存并保证库存扣减消息被正常消费。

        案例中使用到的服务分为两个,订单服务和库存服务;涉及到的数据库表主要有三个,订单表、存储表,本地事务状态表。由于这几个表都比较简单,这里就不将对应的建表语句粘贴出来了,同样对应的 Pojo对象、Dao层、Service层 代码也不粘贴出来了,下面只展示核心逻辑的代码。

1、启动 RocketMQ 服务端:

RocketMQ的安装与部署请参考这篇文章:https://blog.csdn.net/a745233700/article/details/122531859

2、在父pom文件中引入依赖:

	<!-- rocketmq 事务消息 -->
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-spring-boot-starter</artifactId>
        <version>2.2.1</version>
    </dependency>

3、生产端代码:

        生产端的核心逻辑就是向 RocketMQ 投递事务消息,并执行本地事务,最后将本地事务的执行结果通知到 RocketMQ

(1)RocketMQ相关配置:

在 application.properties 配置文件中添加以下配置:

rocketmq.name-server=172.28.190.101:9876
rocketmq.producer.group=order_shop

(2)创建一个监听类:

实现 TransactionListener 接口,在实现的数据库事务提交方法executeLocalTransaction() 和回查事务状态方法checkLocalTransaction() 中模拟结果

/**
 * rocketmq 事务消息回调类
 */
@Slf4j
@Component
public class OrderTransactionListener implements TransactionListener

    @Resource
    private ShopOrderMapper shopOrderMapper;

    /**
     * half消息发送成功后回调此方法,执行本地事务
     *
     * @param message 回传的消息,利用transactionId即可获取到该消息的唯一Id
     * @param arg 调用send方法时传递的参数,当send时候若有额外的参数可以传递到send方法中,这里能获取到
     * @return 返回事务状态,COMMIT:提交  ROLLBACK:回滚  UNKNOW:回调
     */
    @Override
    @Transactional
    public LocalTransactionState executeLocalTransaction(Message message, Object arg)
    
        log.info("开始执行本地事务:订单信息:" + new String(message.getBody()));
        String msgKey = new String(message.getBody());
        ShopOrderPojo shopOrder = JSONObject.parseObject(msgKey, ShopOrderPojo.class);

        int saveResult;
        LocalTransactionState state;
        try
        
            //修改为true时,模拟本地事务异常
            boolean imitateException = true;
            if(imitateException)
            
                throw new RuntimeException("更新本地事务时抛出异常");
            

            // 生成订单,本地事务的回滚依赖于DB的ACID特性,所以需要添加Transactional注解。当本地事务提交失败时,返回ROLLBACK_MESSAGE,则会回滚rocketMQ中的half message,保证分布式事务的一致性。
            saveResult = shopOrderMapper.insert(shopOrder);
            state = saveResult == 1 ? LocalTransactionState.COMMIT_MESSAGE : LocalTransactionState.ROLLBACK_MESSAGE;

            // 更新本地事务并将事务号持久化,为后续的幂等做准备
            // TransactionDao.add(transactionId)
        
        catch (Exception e)
        
            log.error("本地事务执行异常,异常信息:", e);
            state = LocalTransactionState.ROLLBACK_MESSAGE;
        

        //修改为true时,模拟本地事务超时,对于超时的消息,rocketmq会调用checkLocalTransaction方法回查本地事务执行状况
        boolean imitateTimeout = false;
        if(imitateTimeout)
        
            state = LocalTransactionState.UNKNOW;
        

        log.info("本地事务执行结果:msgKey=" + msgKey + ",execute state:" + state);
        return state;
    


    /**
     * 回查本地事务接口
     *
     * @param messageExt 通过获取transactionId来判断这条消息的本地事务执行状态
     * @return 返回事务状态,COMMIT:提交  ROLLBACK:回滚  UNKNOW:回调
     */
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt messageExt)
    
        log.info("调用回查本地事务接口:msgKey=" +  new String(messageExt.getBody()));

        String msgKey = new String(messageExt.getBody());
        ShopOrderPojo shopOrder = JSONObject.parseObject(msgKey, ShopOrderPojo.class);

        // 备注:此处应使用唯一ID查询本地事务是否执行成功,唯一ID可以使用事务的transactionId。但为了验证方便,只查询DB的订单表是否存在对应的记录
        // TransactionDao.isExistTx(transactionId)
        List<ShopOrderPojo> list = shopOrderMapper.selectList(new QueryWrapper<ShopOrderPojo>()
                .eq("shop_id", shopOrder.getShopId())
                .eq("user_id", shopOrder.getUserId()));

        LocalTransactionState state = list.size() > 0 ? LocalTransactionState.COMMIT_MESSAGE : LocalTransactionState.ROLLBACK_MESSAGE;
        log.info("调用回查本地事务接口的执行结果:" +  state);

        return state;
    

        为了方便验证,上面 Demo 使用了两个 boolean 变量 imitateException、imitateTimeout 分别模拟了事务执行异常和超时的情况,只需要将布尔值设置为 true 即可。

(3)投递事务消息:

import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;

@Slf4j
@Service
public class ShopOrderServiceImpl extends ServiceImpl<ShopOrderMapper, ShopOrderPojo> implements ShopOrderService

    @Resource
    private RocketMQTemplate rocketMQTemplate;
    @Autowired
    private OrderTransactionListener orderTransactionListener;

    /**
     * 发送事务消息
     */
    @Override
    public boolean sendOrderRocketMqMsg(ShopOrderPojo shopOrderPojo)
    
        String topic = "storage";
        String tag = "reduce";

        // 设置监听器,此处如果使用MQ其他版本,可能导致强转异常
        ((TransactionMQProducer) rocketMQTemplate.getProducer()).setTransactionListener(orderTransactionListener);

        //构建消息体
        String msg = JSONObject.toJSONString(shopOrderPojo);
        org.springframework.messaging.Message<String> message = MessageBuilder.withPayload(msg).build();
        //发送事务消息,由消费者进行进行减少库存
        TransactionSendResult sendResult = rocketMQTemplate.sendMessageInTransaction(topic + ":" + tag , message, null);

        log.info("Send transaction msg result: " + sendResult);
        return sendResult.getSendStatus() == SendStatus.SEND_OK;
    

4、消费端代码:

        消费端的核心逻辑就是监听 MQ,接收消息;接收到消息之后扣减库存

(1)RocketMQ相关配置:

在 application.properties 配置文件中添加以下配置:

rocketmq.name-server=172.28.190.101:9876
rocketmq.consumer.group=order_shop

(2)消费监听类:

import com.alibaba.fastjson.JSONObject;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;

/**
 * 库存管理消费者类
 **/
@Component
@RocketMQMessageListener (consumerGroup = "order_storage", topic = "storage")
public class StorageConsumerListener implements RocketMQListener<String>

    @Resource
    private TStorageService tStorageService;

    /**
     * rocketMQ消费者
     */
    @Override
    public void onMessage(String message)
    
        System.out.println("消费者开始消费:从MQ中获取的消息是:" + message);
        ShopOrderPojo shopOrder = JSONObject.parseObject(message, ShopOrderPojo.class);

        // 1、幂等校验,防止消息重复消费--此处省略相关的代码逻辑:
        // TransactionDao.isExistTx(transactionId)

        // 2、执行消息消费操作--减少商品库存:
        TStoragePojo shop = tStorageService.getById(shopOrder.getShopId());
        shop.setNum(shop.getNum() - 1);
        boolean updateResult = tStorageService.updateById(shop);

        // 3、添加事务操作记录--此次省略代码:
        // TransactionDao.add(transactionId)

        System.out.println("消费者完成消费:操作结果:" + updateResult);
    

至此,一个完整的基于 RocketMQ 事务消息实现的分布式事务的最终一致性就完成了。


相关阅读:

常见的服务器架构入门:从单体架构、EAI 到 SOA 再到微服务和 ServiceMesh

常见分布式理论(CAP、BASE)和一致性协议(Gosssip协议、Raft一致性算法)

一致性哈希算法原理详解

Nacos注册中心的部署与用法详细介绍

Nacos配置中心用法详细介绍

SpringCloud OpenFeign 远程HTTP服务调用用法与原理

什么是RPC?RPC框架dubbo的核心流程

服务容错设计:流量控制、服务熔断、服务降级

sentinel 限流熔断神器详细介绍

Sentinel 规则持久化到 apollo 配置中心

Sentinel-Dashboard 与 apollo 规则的相互同步

Spring Cloud Gateway 服务网关的部署与使用详细介绍

Spring Cloud Gateway 整合 sentinel 实现流控熔断

Spring Cloud Gateway 整合 knife4j 聚合接口文档

常见分布式事务详解(2PC、3PC、TCC、Saga、本地事务表、MQ事务消息、最大努力通知)

分布式事务Seata原理

RocketMQ事务消息原理


参考文章:https://www.cnblogs.com/huangying2124/p/11702761.html

rocketmq源码分析之rocketmq事务消息实现原理中篇----事务消息状态回查(代码片段)

上节已经梳理了RocketMQ发送事务消息的流程(基于二阶段提交),本节将继续深入学习事务状态消息回查,我们知道,第一次提交到消息服务器时消息的主题被替换为RMQ_SYS_TRANS_HALF_TOPIC,本地事务执行完后如果返回本地事务状态... 查看详情

rocketmq事务消息实战(代码片段)

RocketMQ事务消息阅读目录指引:RocketMQ源码分析之从官方示例窥探RocketMQ事务消息实现基本思想RocketMQ源码分析之RocketMQ事务消息实现原理上篇RocketMQ源码分析之RocketMQ事务消息实现原理中篇----事务消息状态回查RocketMQ源码分析... 查看详情

rocketmq使用事务消息(代码片段)

目录说明原理事务消息处理流程生产端消费端说明事务消息:1、不支持延时消息和批量消息2、如果消息没有及时提交,默认check15次,可以通过Broker的transactionCheckMax参数配置次数。如果超时15次依然没有得到明确结果... 查看详情

rocketmq事务消息原理及使用方法解析(代码片段)

...#x1f34a;Java学习:Java从入门到精通总结🍊深入浅出RocketMQ设计思想:深入浅出RocketMQ设计思想🍊绝对不一样的职场干货:大厂最佳实践经验指南📆最近更新:2023年3月24日🍊个人简介:通信工程... 查看详情

rocketmq源码分析之从官方示例窥探:rocketmq事务消息实现基本思想(代码片段)

RocketMQ4.3.0版本开始支持事务消息,后续分享将开始将剖析事务消息的实现原理。首先从官方给出的Demo实例入手,以此通往RocketMQ事务消息的世界中。官方版本未发布之前,从apacherocketmq第一个版本上线后,代码中存在与事务消息... 查看详情

rocketmq入门到入土事务消息&顺序消息(代码片段)

接上一篇:RocketMQ入门到入土(一)新手也能看懂的原理和实战!一、事务消息的由来1、案例引用官方的购物案例:小明购买一个100元的东西,账户扣款100元的同时需要保证在下游的积分系统给小明这个账号增加100积分。账号系... 查看详情

rocketmq事务消息入门介绍(代码片段)

说明周五的时候发了篇:Rocketmq4.3支持事务啦!!!,趁着周末的时候把相关内容看了下,下面的主要内容就是关于RocketMQ事务相关内容介绍了。说明:今天这篇仅仅是入门介绍,并没有涉及到很多细节,先把大概流程说明白,... 查看详情

rocketmq(09)——发送事务消息(代码片段)

发送事务消息RocketMQ支持发送事务消息,它的事务消息是基于二阶段提交机制实现的。当发送的消息是事务消息时,只有对应的消息被提交了才能被消费者进行消费。发送事务消息时生产者需要使用TransactionMQProducer,它还需要指... 查看详情

rocketmq使用事务消息(代码片段)

...ate.ROLLBACK_MESSAGE、LocalTransactionState.UNKNOW。原理事务消息是RocketMQ的一大特性,其保证发送消息和执行本地逻辑在同一个事务内。实现的思路借鉴了两阶段提交协议:第一阶段:发送半事务消息,消息发送后,消... 查看详情

rocketmq事务消息篇之事务消息的使用(代码片段)

前言在RocketMQ事务消息篇(一)之事务消息的介绍里对RocketMQ的事务消息作了相关说明,本文提供一些基本的开发示例。java示例依赖<dependencies><dependency><groupId>org.apache.rocketmq</groupId><artifactId>ro 查看详情

rocketmq(09)——发送事务消息(代码片段)

发送事务消息RocketMQ支持发送事务消息,它的事务消息是基于二阶段提交机制实现的。当发送的消息是事务消息时,只有对应的消息被提交了才能被消费者进行消费。发送事务消息时生产者需要使用TransactionMQProducer,... 查看详情

rocketmq事务消息篇之事务消息源码分析(代码片段)

前言RocketMQ事务消息篇(一)之事务消息的介绍RocketMQ事务消息篇(二)之事务消息的使用本文继前两篇对事务消息源码进行分析。事务消息处理基本流程在介绍事务消息的时候,画了一个简单的流程图说明事... 查看详情

[转帖]rocketmq原理&最佳实践(代码片段)

Rocketmq原理&最佳实践https://www.jianshu.com/p/2838890f3284 彦帧关注142018.08.0515:48:44字数3,451阅读174,582一、MQ背景&选型消息队列作为高并发系统的核心组件之一,能够帮助业务系统解构提升开发效率和系统稳定性。主要具有以下... 查看详情

30rocketmq事务消息的代码实现细节(代码片段)

...。1.发送half事务消息出去packagecom.mqTrsMessage;importorg.apache.rocketmq.client.producer.DefaultMQProducer;importorg.apache.rocketmq.clien 查看详情

11springboot整合rocketmq实现事务消息(代码片段)

事务消息是RocketMQ提供的非常重要的一个特性,在4.x版本之后开源,可以利用事务消息轻松地实现分布式事务。RocketMQ在其消息定义的基础上,对事务消息扩展了两个相关的概念:Half(Prepare)Message——半消息(预处理... 查看详情

rocketmq实现事务消息方案(代码片段)

RocketMQ是一个来自阿里巴巴的分布式消息中间件,于2012年开源,并在2017年正式成为Apache顶级项目。据了解,包括阿里云上的消息产品以及收购的子公司在内,阿里集团的消息产品全线都运行在RocketMQ之上,并且最近几年的双十一... 查看详情

rocketmq解决分布式事务(代码片段)

...Id=orderService.sendOrder();9returnorderId;101112 1@Slf4j2@Component3@RocketMQTransactionListener(txProducerGroup="mayiktProducer")4p 查看详情

七.rocketmq极简入门-rocketmq事务消息(代码片段)

概述如果业务只涉及到一个数据库的写操作,我们只需要保证这一个事物的提交和回滚,这种事务管理叫传统事物或本地事务,如果业务涉及到多个数据库(多个服务)的写操作,我们需要保证多个数据库同时提交... 查看详情