rocketmq(十六)事务消息(代码片段)

爱是与世界平行 爱是与世界平行     2022-12-08     708

关键词:

1、问题引入

这里的一个需求场景是:工行用户A向建行用户B转账1万元。

我们可以使用同步消息来处理该需求场景:

  1. 工行系统发送一个给B增款1万元的同步消息M给Broker
  2. 消息被Broker成功接收后,向工行系统发送成功ACK
  3. 工行系统收到成功ACK后从用户A中扣款1万元
  4. 建行系统从Broker中获取到消息M
  5. 建行系统消费消息M,即向用户B中增加1万元

这其中是有问题的:若第3步中的扣款操作失败,但消息已经成功发送到了Broker。对于MQ来说,只要消息写入成功,那么这个消息就可以被消费。此时建行系统中用户B增加了1万元。出现了数据不一致问题。

2、解决思路

解决思路是,让第1、2、3步具有原子性,要么全部成功,要么全部失败。即消息发送成功后,必须要保证扣款成功。如果扣款失败,则回滚发送成功的消息。而该思路即使用事务消息。这里要使用分布式事务解决方案。

使用事务消息来处理该需求场景:

  1. 事务管理器TM向事务协调器TC发起指令,开启全局事务

  2. 工行系统发一个给B增款1万元的事务消息M给TC

  3. TC会向Broker发送半事务消息prepareHalf,将消息M预提交到Broker。此时的建行系统是看不到Broker中的消息M的

  4. Broker会将预提交执行结果Report给TC。

  5. 如果预提交失败,则TC会向TM上报预提交失败的响应,全局事务结束;如果预提交成功,TC会调用工行系统的回调操作,去完成工行用户A的预扣款1万元的操作

  6. 工行系统会向TC发送预扣款执行结果,即本地事务的执行状态

  7. TC收到预扣款执行结果后,会将结果上报给TM

    预扣款执行结果存在三种可能性:

    // 描述本地事务执行状态
    public enum LocalTransactionState 
    COMMIT_MESSAGE, // 本地事务执行成功
    ROLLBACK_MESSAGE, // 本地事务执行失败
    UNKNOW, // 不确定,表示需要进行回查以确定本地事务的执行结果
    
    
  8. TM会根据上报结果向TC发出不同的确认指令

    • 若预扣款成功(本地事务状态为COMMIT_MESSAGE),则TM向TC发送Global Commit指令
    • 若预扣款失败(本地事务状态为ROLLBACK_MESSAGE),则TM向TC发送Global Rollback指令
    • 若现未知状态(本地事务状态为UNKNOW),则会触发工行系统的本地事务状态回查操作。回查操作会将回查结果,即COMMIT_MESSAGE或ROLLBACK_MESSAGE Report给TC。TC将结果上报给TM,TM会再向TC发送最终确认指令Global Commit或Global Rollback
  9. TC在接收到指令后会向Broker与工行系统发出确认指令

    • TC接收的若是Global Commit指令,则向Broker与工行系统发送Branch Commit指令。此时Broker中的消息M才可被建行系统看到;此时的工行用户A中的扣款操作才真正被确认
    • TC接收到的若是Global Rollback指令,则向Broker与工行系统发送Branch Rollback指令。此时Broker中的消息M将被撤销;工行用户A中的扣款操作将被回滚

    以上方案就是为了确保 消息投递与 扣款操作 能够在一个事务中,要成功都成功,有一个失败,则全部回滚。

    以上方案并不是一个典型的XA模式。因为XA模式中的分支事务是异步的,而事务消息方案中的消息预提交与预扣款操作间是同步的。

3、基础

3.1、分布式事务

对于分布式事务,通俗地说就是,一次操作由若干分支操作组成,这些分支操作分属不同应用,分布在不同服务器上。分布式事务需要保证这些分支操作要么全部成功,要么全部失败。分布式事务与普通事务一样,就是为了保证操作结果的一致性。

3.2、事务消息

RocketMQ提供了类似X/Open XA的分布式事务功能,通过事务消息能达到分布式事务的最终一致。XA是一种分布式事务解决方案,一种分布式事务处理模式。

3.3、半事务消息

暂不能投递的消息,发送方已经成功地将消息发送到了Broker,但是Broker未收到最终确认指令,此时该消息被标记成“暂不能投递”状态,即不能被消费者看到。处于该种状态下的消息即半事务消息。

3.4、本地事务状态

Producer回调操作执行的结果为本地事务状态,其会发送给TC,而TC会再发送给TM。TM会根据TC发送来的本地事务状态来决定全局事务确认指令。

// 描述本地事务执行状态
public enum LocalTransactionState 
    COMMIT_MESSAGE, // 本地事务执行成功
    ROLLBACK_MESSAGE, // 本地事务执行失败
    UNKNOW, // 不确定,表示需要进行回查以确定本地事务的执行结果

3.5、消息回查

消息回查,即重新查询本地事务的执行状态。本例就是重新到DB中查看预扣款操作是否执行成功。

注意,消息回查不是重新执行回调操作。回调操作是进行预扣款操作,而消息回查则是查看预扣款操作执行的结果。

引发消息回查的原因最常见的有两个:

1)回调操作返回UNKNWON

2)TC没有接收到TM的最终全局事务确认指令

3.5、RocketMQ中的消息回查设置

关于消息回查,有三个常见的属性设置。它们都在broker加载的配置文件中设置,例如:

  • transactionTimeout=20,指定TM在20秒内应将最终确认状态发送给TC,否则引发消息回查。默认为60秒
  • transactionCheckMax=5,指定最多回查5次,超过后将丢弃消息并记录错误日志。默认15次。
  • transactionCheckInterval=10,指定设置的多次消息回查的时间间隔为10秒。默认为60秒

4、XA模式三剑客

4.1、XA协议

XA(Unix Transaction)是一种分布式事务解决方案,一种分布式事务处理模式,是基于XA协议的。XA协议由Tuxedo(Transaction for Unix has been Extended for Distributed Operation,分布式操作扩展之后的Unix事务系统)首先提出的,并交给X/Open组织,作为资源管理器与事务管理器的接口标准。

XA模式中有三个重要组件:TC、TM、RM。

4.2、TC

Transaction Coordinator,事务协调者。维护全局和分支事务的状态,驱动全局事务提交或回滚。

RocketMQ中Broker充当着TC。

4.3、TM

Transaction Manager,事务管理器。定义全局事务的范围:开始全局事务、提交或回滚全局事务。它实际是全局事务的发起者。

RocketMQ中事务消息的Producer充当着TM。

4.4、RM

Resource Manager,资源管理器。管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。

RocketMQ中事务消息的Producer及Broker均是RM。

5、XA模式架构

XA模式是一个典型的2PC,其执行原理如下:

  1. TM向TC发起指令,开启一个全局事务。

  2. 根据业务要求,各个RM会逐个向TC注册分支事务,然后TC会逐个向RM发出预执行指令。

  3. 各个RM在接收到指令后会在进行本地事务预执行。

  4. RM将预执行结果Report给TC。当然,这个结果可能是成功,也可能是失败。

  5. TC在接收到各个RM的Report后会将汇总结果上报给TM,根据汇总结果TM会向TC发出确认指令。

    • 若所有结果都是成功响应,则向TC发送Global Commit指令。
    • 只要有结果是失败响应,则向TC发送Global Rollback指令。
  6. TC在接收到指令后再次向RM发送确认指令。

    事务消息方案并不是一个典型的XA模式。因为XA模式中的分支事务是异步的,而事务消息方案中的消息预提交与预扣款操作间是同步的。

6、注意

  • 事务消息不支持延时消息
  • 对于事务消息要做好幂等性检查,因为事务消息可能不止一次被消费(因为存在回滚后再提交的情况)

7、代码举例

7.1、定义工行事务监听器

import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
public class ICBCTransactionListener implements TransactionListener 
    // 回调操作方法
    // 消息预提交成功就会触发该方法的执行,用于完成本地事务
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) 
        System.out.println("预提交消息成功:" + msg);
        // 假设接收到TAGA的消息就表示扣款操作成功,TAGB的消息表示扣款失败,
        // TAGC表示扣款结果不清楚,需要执行消息回查
        if (StringUtils.equals("TAGA", msg.getTags())) 
            return LocalTransactionState.COMMIT_MESSAGE;
         else if (StringUtils.equals("TAGB", msg.getTags())) 
            return LocalTransactionState.ROLLBACK_MESSAGE;
         else if (StringUtils.equals("TAGC", msg.getTags())) 
            return LocalTransactionState.UNKNOW;
        
        return LocalTransactionState.UNKNOW;
    
    // 消息回查方法
    // 引发消息回查的原因最常见的有两个:
    // 1)回调操作返回UNKNWON
    // 2)TC没有接收到TM的最终全局事务确认指令
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) 
        System.out.println("执行消息回查" + msg.getTags());
        return LocalTransactionState.COMMIT_MESSAGE;
    

7.2、定义事物消息生产者

import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import java.util.concurrent.*;
public class TransactionProducer 
    public static void main(String[] args) throws Exception 
        TransactionMQProducer producer = new TransactionMQProducer("tpg");
        producer.setNamesrvAddr("192.168.216.128:9876");
        /**
         * 定义一个线程池
         * @param corePoolSize 线程池中核心线程数量
         * @param maximumPoolSize 线程池中最多线程数
         * @param keepAliveTime 这是一个时间,当线程池中线程数量大于核心线程数量是多余空闲线程的存活时长
         * @param unit 时间单位
         * @param workQueue 临时存放任务的队列,其参数就是队列的长度
         * @param threadFactory 线程工厂
         */
        ExecutorService executorService = new ThreadPoolExecutor(
                2,
                5,
                100,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(2000),
                new ThreadFactory() 
                    @Override
                    public Thread newThread(Runnable r) 
                        Thread thread = new Thread(r);
                        thread.setName("client-transaction-msg-check-thread");
                        return thread;
                    
                );
        // 为生产者指定一个线程池
        producer.setExecutorService(executorService);
        // 为生产者添加事务监听器
        producer.setTransactionListener(new ICBCTransactionListener());
        producer.start();
        String[] tags = "TAGA", "TAGB", "TAGC";
        for (int i = 0; i < 3; i++) 
            byte[] body = ("Hi," + i).getBytes();
            Message msg = new Message("TTopic", tags[i], body);
            // 发送事务消息
            // 第二个参数用于指定在执行本地事务时要使用的业务参数
            SendResult sendResult = producer.sendMessageInTransaction(msg, null);
            System.out.println("发送结果为:" + sendResult.getSendStatus());
        
    

7.3、定义消费者

直接使用普通消息的SomeConsumer作为消费者即可

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class SomeConsumer 
    public static void main(String[] args) throws MQClientException 
        // 定义一个pull消费者
        // DefaultLitePullConsumer consumer = new        DefaultLitePullConsumer("cg");
        // 定义一个push消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg");
        // 指定nameServer
        consumer.setNamesrvAddr("rocketmqOS:9876");
        // 指定从第一条消息开始消费
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET
        );
        // 指定消费topic与tag
        consumer.subscribe("TTopic", "*");
        // 指定采用“广播模式”进行消费,默认为“集群模式”
        // consumer.setMessageModel(MessageModel.BROADCASTING);
        // 注册消息监听器
        consumer.registerMessageListener(new MessageListenerConcurrently() 
            // 一旦broker中有了其订阅的消息就会触发该方法的执行,其返回值为当前consumer消费的状态
            @Override
            public ConsumeConcurrentlyStatus
            consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) 
                // 逐条消费消息
                for (MessageExt msg : msgs) 
                    System.out.println(msg);
                
                //返回消费状态:消费成功
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            
        );
        // 开启消费者消费
        consumer.start();
        System.out.println("Consumer Started");
    

rocketmq(09)——发送事务消息(代码片段)

发送事务消息RocketMQ支持发送事务消息,它的事务消息是基于二阶段提交机制实现的。当发送的消息是事务消息时,只有对应的消息被提交了才能被消费者进行消费。发送事务消息时生产者需要使用TransactionMQProducer,它还需要指... 查看详情

rocketmq事务消息篇之事务消息的使用(代码片段)

前言在RocketMQ事务消息篇(一)之事务消息的介绍里对RocketMQ的事务消息作了相关说明,本文提供一些基本的开发示例。java示例依赖<dependencies><dependency><groupId>org.apache.rocketmq</groupId><artifactId>ro 查看详情

rocketmq(09)——发送事务消息(代码片段)

发送事务消息RocketMQ支持发送事务消息,它的事务消息是基于二阶段提交机制实现的。当发送的消息是事务消息时,只有对应的消息被提交了才能被消费者进行消费。发送事务消息时生产者需要使用TransactionMQProducer,... 查看详情

rocketmq事务消息篇之事务消息源码分析(代码片段)

前言RocketMQ事务消息篇(一)之事务消息的介绍RocketMQ事务消息篇(二)之事务消息的使用本文继前两篇对事务消息源码进行分析。事务消息处理基本流程在介绍事务消息的时候,画了一个简单的流程图说明事... 查看详情

rocketmq事务消息原理(代码片段)

一、RocketMQ事务消息原理:        RocketMQ在4.3版本之后实现了完整的事务消息,基于MQ的分布式事务方案,本质上是对本地消息表的一个封装,整体流程与本地消息表一致,唯一不同的就是将本地消息表存... 查看详情

30rocketmq事务消息的代码实现细节(代码片段)

...。1.发送half事务消息出去packagecom.mqTrsMessage;importorg.apache.rocketmq.client.producer.DefaultMQProducer;importorg.apache.rocketmq.clien 查看详情

11springboot整合rocketmq实现事务消息(代码片段)

事务消息是RocketMQ提供的非常重要的一个特性,在4.x版本之后开源,可以利用事务消息轻松地实现分布式事务。RocketMQ在其消息定义的基础上,对事务消息扩展了两个相关的概念:Half(Prepare)Message——半消息(预处理... 查看详情

rocketmq实现事务消息方案(代码片段)

RocketMQ是一个来自阿里巴巴的分布式消息中间件,于2012年开源,并在2017年正式成为Apache顶级项目。据了解,包括阿里云上的消息产品以及收购的子公司在内,阿里集团的消息产品全线都运行在RocketMQ之上,并且最近几年的双十一... 查看详情

rocketmq使用事务消息(代码片段)

目录说明原理事务消息处理流程生产端消费端说明事务消息:1、不支持延时消息和批量消息2、如果消息没有及时提交,默认check15次,可以通过Broker的transactionCheckMax参数配置次数。如果超时15次依然没有得到明确结果... 查看详情

rocketmq事务消息实战(代码片段)

RocketMQ事务消息阅读目录指引:RocketMQ源码分析之从官方示例窥探RocketMQ事务消息实现基本思想RocketMQ源码分析之RocketMQ事务消息实现原理上篇RocketMQ源码分析之RocketMQ事务消息实现原理中篇----事务消息状态回查RocketMQ源码分析... 查看详情

rocketmq源码分析之从官方示例窥探:rocketmq事务消息实现基本思想(代码片段)

RocketMQ4.3.0版本开始支持事务消息,后续分享将开始将剖析事务消息的实现原理。首先从官方给出的Demo实例入手,以此通往RocketMQ事务消息的世界中。官方版本未发布之前,从apacherocketmq第一个版本上线后,代码中存在与事务消息... 查看详情

rocketmq源码分析之rocketmq事务消息实现原理中篇----事务消息状态回查(代码片段)

上节已经梳理了RocketMQ发送事务消息的流程(基于二阶段提交),本节将继续深入学习事务状态消息回查,我们知道,第一次提交到消息服务器时消息的主题被替换为RMQ_SYS_TRANS_HALF_TOPIC,本地事务执行完后如果返回本地事务状态... 查看详情

rocketmq源码分析之rocketmq事务消息实现原理中篇----事务消息状态回查(代码片段)

上节已经梳理了RocketMQ发送事务消息的流程(基于二阶段提交),本节将继续深入学习事务状态消息回查,我们知道,第一次提交到消息服务器时消息的主题被替换为RMQ_SYS_TRANS_HALF_TOPIC,本地事务执行完后如果返回本地事务状态... 查看详情

七.rocketmq极简入门-rocketmq事务消息(代码片段)

概述如果业务只涉及到一个数据库的写操作,我们只需要保证这一个事物的提交和回滚,这种事务管理叫传统事物或本地事务,如果业务涉及到多个数据库(多个服务)的写操作,我们需要保证多个数据库同时提交... 查看详情

rocketmq入门到入土事务消息&顺序消息(代码片段)

接上一篇:RocketMQ入门到入土(一)新手也能看懂的原理和实战!一、事务消息的由来1、案例引用官方的购物案例:小明购买一个100元的东西,账户扣款100元的同时需要保证在下游的积分系统给小明这个账号增加100积分。账号系... 查看详情

rocketmq的分布式事务机制(事务消息)(代码片段)

详细介绍了RocketMQ的事务消息机制,RocketMQ的事务消息可以用于实现基于可靠消息的最终一致性的分布式事务。文章目录1事务消息简要流程2一阶段半消息不可见的设计3二阶段Commit和Rollback操作4Op消息的设计5Commit消息变得可见... 查看详情

rocketmq事务消息机制(代码片段)

RocketMQ提供了事务消息,通过事务消息就能达到分布式事务的最终一致,从而实现了可靠消息服务。一、事务消息的实现步骤事务消息发送步骤:1.发送方将半事务消息发送至RocketMQ服务端。2.RocketMQ服务端将消息持久... 查看详情

spring解决rocketmq发消息与mysql事务一致性(代码片段)

场景用户订单并支付发送消息开通查看文章权限//伪代码@Transactional(rollbackFor=Exception.class)publicvoidpay(longuid,StringorderNO)Orderorder=orderService.selectOrder(uid,orderNO)if(order!=null)Stringstatus= 查看详情