30rocketmq事务消息的代码实现细节(代码片段)

鮀城小帅 鮀城小帅     2022-12-08     494

关键词:

基于官方文档提供的事务消息API使用的例子来进行分析,这里会把订单系统的业务场景房子里面,加入一些伪代码进行参考。

1. 发送half事务消息出去

package com.mqTrsMessage;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

import java.util.concurrent.*;

/**
 * @ClassName TransactionProducer
 * @Description TODO 事务消息机制
 * @Author wushaopei
 * @Date 2021/7/18 15:05
 * @Version 1.0
 */
public class TransactionProducer 

    public static void main(String[] args) throws Exception 

        // 这个东西就是用来接收RcoketMQ回调的一个监听器接口
        // 这里会实现执行订单本地事务,commit,rollback,回调查询等逻辑
        TransactionListener transactionListener = new TransactionListenerImpl();

        // 下面这个就是创建支持事务消息的Producer
        // 对这个Producer还得指定要指定一个生产者分组,根据业务指定名字
        TransactionMQProducer producer = new
                TransactionMQProducer("TestProducerGroup");

        producer.setNamesrvAddr("192.168.133.115:9876");

        // 下面这个是指定了一个线程池,里面会包含一些线程
        // 这个线程池里的线程就是用来处理RocketMQ回调你的请求
        ExecutorService executorService = new ThreadPoolExecutor(
                2,
                5,
                100,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() 
                    @Override
                    public Thread newThread(Runnable r) 
                        Thread thread = new Thread(r);
                        thread.setName("TestThread");
                        return thread;
                    
        );

        // 给事务消息生产者设置对应的线程池,负责执行RocketMQ回调请求
        producer.setExecutorService(executorService);
        // 给事务消息生产者设置对应的回避函数
        producer.setTransactionListener(transactionListener);
        //启动实例
        producer.start();


        // 构造一条订单支付成功的消息,指定Topic是谁
        Message msg = new Message("PayOrderSuccessTopic" /* Topic */,
                "TestTag" /* Tag */,
                "TestKey",
                ("订单支付消息").getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
        );
        //Call send message to deliver message to one of brokers.
        SendResult sendResult = producer.sendMessageInTransaction(msg,null);
        System.out.printf("%s%n", sendResult);
    

2.加入half消息发送失败,或者没收到half消息响应怎么办?

假如发送half消息失败了,就会在执行“producer.sendMessageInTransaction(msg,null);” 的时候,收到一个异常,发现消息发送失败了。

可以使用下面的代码去关注half消息发送失败的问题:

 try

    SendResult sendResult = producer.sendMessageInTransaction(msg,null);

 catch(Exception e)
    // half消息发送失败
    // 订单系统执行回滚逻辑,比如说触发支付退款,更新订单状态为“已关闭”
 

如果一直没有收到half消息发送成功的通知呢?

针对这个问题,可以把发送出去的half消息放在内存里,或者写入本地磁盘文件,后台开启一个线程去检查,如果一个half消息超过比如10分钟都没有收到响应,那就自动触发回滚逻辑。这个补偿机制是MQ客户端内部自己的。

3.如果half消息成功了,如何执行订单本地事务?

public class TransactionListenerImpl implements TransactionListener 
    @Override
    public LocalTransactionState executeLocalTransaction(Message message, Object o) 
        // 执行订单本地事务
        // 接着执行本地一连串事务执行结果,去选择执行commit or rollback
        try 
            // 如果本地事务都执行成功了,返回commit
            return LocalTransactionState.COMMIT_MESSAGE;
        catch (Exception e)
            // 本地事务执行失败,回滚所有一切执行过的操作
            // 如果本地事务执行失败了,返回rollback,标记half消息无效
            return LocalTransactionState.ROLLBACK_MESSAGE;
        
    

4.如果没有返回commit或者rollback,如何进行回调?

package com.mqTrsMessage;

import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;

/**
 * @ClassName TransactionListenerImpl
 * @Description TODO 事务消息监听器
 * @Author wushaopei
 * @Date 2021/7/18 15:15
 * @Version 1.0
 */
public class TransactionListenerImpl implements TransactionListener 
    @Override
    public LocalTransactionState executeLocalTransaction(Message message, Object o) 
        // 执行订单本地事务
        // 接着执行本地一连串事务执行结果,去选择执行commit or rollback
        try 
            // 如果本地事务都执行成功了,返回commit
            return LocalTransactionState.COMMIT_MESSAGE;
        catch (Exception e)
            // 本地事务执行失败,回滚所有一切执行过的操作
            // 如果本地事务执行失败了,返回rollback,标记half消息无效
            return LocalTransactionState.ROLLBACK_MESSAGE;
        
    

    // 如果因为各种原因,没有返回 commit 或者rollback
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) 
        // 查询本地事务,是否执行成功了
        Integer status = LocalTrans.transMap.get(msg.getTransactionId());
        // 根据本地事务的情况去选择执行 commit or rollback
        if(null != status)
            switch (status)
                case 0: return LocalTransactionState.UNKNOW;
                case 1: return LocalTransactionState.COMMIT_MESSAGE;
                case 2: return LocalTransactionState.ROLLBACK_MESSAGE;
            
        
        return LocalTransactionState.COMMIT_MESSAGE;
    

5.debug测试代码

由截图可知,该事务消息已成功写入MQ进程服务上,并返回了commit。

11springboot整合rocketmq实现事务消息(代码片段)

事务消息是RocketMQ提供的非常重要的一个特性,在4.x版本之后开源,可以利用事务消息轻松地实现分布式事务。RocketMQ在其消息定义的基础上,对事务消息扩展了两个相关的概念:Half(Prepare)Message——半消息(预处理... 查看详情

rocketmq实现事务消息方案(代码片段)

RocketMQ是一个来自阿里巴巴的分布式消息中间件,于2012年开源,并在2017年正式成为Apache顶级项目。据了解,包括阿里云上的消息产品以及收购的子公司在内,阿里集团的消息产品全线都运行在RocketMQ之上,并且最近几年的双十一... 查看详情

rocketmq源码分析之从官方示例窥探:rocketmq事务消息实现基本思想(代码片段)

RocketMQ4.3.0版本开始支持事务消息,后续分享将开始将剖析事务消息的实现原理。首先从官方给出的Demo实例入手,以此通往RocketMQ事务消息的世界中。官方版本未发布之前,从apacherocketmq第一个版本上线后,代码中存在与事务消息... 查看详情

rocketmq(09)——发送事务消息(代码片段)

发送事务消息RocketMQ支持发送事务消息,它的事务消息是基于二阶段提交机制实现的。当发送的消息是事务消息时,只有对应的消息被提交了才能被消费者进行消费。发送事务消息时生产者需要使用TransactionMQProducer,它还需要指... 查看详情

rocketmq事务消息实战(代码片段)

RocketMQ事务消息阅读目录指引:RocketMQ源码分析之从官方示例窥探RocketMQ事务消息实现基本思想RocketMQ源码分析之RocketMQ事务消息实现原理上篇RocketMQ源码分析之RocketMQ事务消息实现原理中篇----事务消息状态回查RocketMQ源码分析... 查看详情

rocketmq源码分析之rocketmq事务消息实现原理中篇----事务消息状态回查(代码片段)

上节已经梳理了RocketMQ发送事务消息的流程(基于二阶段提交),本节将继续深入学习事务状态消息回查,我们知道,第一次提交到消息服务器时消息的主题被替换为RMQ_SYS_TRANS_HALF_TOPIC,本地事务执行完后如果返回本地事务状态... 查看详情

rocketmq源码分析之rocketmq事务消息实现原理中篇----事务消息状态回查(代码片段)

上节已经梳理了RocketMQ发送事务消息的流程(基于二阶段提交),本节将继续深入学习事务状态消息回查,我们知道,第一次提交到消息服务器时消息的主题被替换为RMQ_SYS_TRANS_HALF_TOPIC,本地事务执行完后如果返回本地事务状态... 查看详情

rocketmq(09)——发送事务消息(代码片段)

发送事务消息RocketMQ支持发送事务消息,它的事务消息是基于二阶段提交机制实现的。当发送的消息是事务消息时,只有对应的消息被提交了才能被消费者进行消费。发送事务消息时生产者需要使用TransactionMQProducer,... 查看详情

rocketmq事务消息原理(代码片段)

一、RocketMQ事务消息原理:        RocketMQ在4.3版本之后实现了完整的事务消息,基于MQ的分布式事务方案,本质上是对本地消息表的一个封装,整体流程与本地消息表一致,唯一不同的就是将本地消息表存... 查看详情

rocketmq事务消息详解(代码片段)

...#x1f34a;Java学习:Java从入门到精通总结🍊深入浅出RocketMQ设计思想:深入浅出RocketMQ设计思想🍊绝对不一样的职场干货:大厂最佳实践经验指南📆最近更新:2023年4月9日🍊个人简介:通信工程本... 查看详情

rocketmq事务消息机制(代码片段)

RocketMQ提供了事务消息,通过事务消息就能达到分布式事务的最终一致,从而实现了可靠消息服务。一、事务消息的实现步骤事务消息发送步骤:1.发送方将半事务消息发送至RocketMQ服务端。2.RocketMQ服务端将消息持久... 查看详情

rocketmq事务消息篇之事务消息的使用(代码片段)

前言在RocketMQ事务消息篇(一)之事务消息的介绍里对RocketMQ的事务消息作了相关说明,本文提供一些基本的开发示例。java示例依赖<dependencies><dependency><groupId>org.apache.rocketmq</groupId><artifactId>ro 查看详情

rocketmq事务消息篇之事务消息源码分析(代码片段)

前言RocketMQ事务消息篇(一)之事务消息的介绍RocketMQ事务消息篇(二)之事务消息的使用本文继前两篇对事务消息源码进行分析。事务消息处理基本流程在介绍事务消息的时候,画了一个简单的流程图说明事... 查看详情

rocketmq的分布式事务机制(事务消息)(代码片段)

详细介绍了RocketMQ的事务消息机制,RocketMQ的事务消息可以用于实现基于可靠消息的最终一致性的分布式事务。文章目录1事务消息简要流程2一阶段半消息不可见的设计3二阶段Commit和Rollback操作4Op消息的设计5Commit消息变得可见... 查看详情

rocketmq使用事务消息(代码片段)

...ate.ROLLBACK_MESSAGE、LocalTransactionState.UNKNOW。原理事务消息是RocketMQ的一大特性,其保证发送消息和执行本地逻辑在同一个事务内。实现的思路借鉴了两阶段提交协议:第一阶段:发送半事务消息,消息发送后,消... 查看详情

rocketmq(10)——发送延时消息(代码片段)

...达了Broker就会写入消息队列,消费者就可以进行消费了。RocketMQ支持我们发送延时消费的消息,即现在发送的消息先发送到Broker,但是需要过一会才能进行消费。如果需要发送延时消息,只需要通过Message的setDelayDelevel()指定一个... 查看详情

rocketmq源码解析-消息消费(代码片段)

RocketMQ源码解析-消息消费1.消费者相关类2.消费者的启动3.消息的拉取4.消费者的负载均衡5.消息的消费6.消费进度管理看了很多遍的代码,还是决定动手把记录下来,梳理一下整体结构和实现细节,给大家一个参考,写的不好的地... 查看详情

rocketmq事务消息

RocketMQ事务消息在实现上充分利用了RocketMQ本身机制,在实现零依赖的基础上,同样实现了高性能、可扩展、全异步等一系列特性。在具体实现上,RocketMQ通过使用HalfTopic以及OperationTopic两个内部队列来存储事务消息推进状态,如... 查看详情