rocketmq源码分析之从官方示例窥探:rocketmq事务消息实现基本思想(代码片段)

author author     2023-02-20     475

关键词:

RocketMQ4.3.0版本开始支持事务消息,后续分享将开始将剖析事务消息的实现原理。首先从官方给出的Demo实例入手,以此通往RocketMQ事务消息的世界中。

官方版本未发布之前,从apache rocketmq第一个版本上线后,代码中存在与事务消息相关的代码,例如COMMIT、ROLLBACK、PREPARED,在事务消息未开源之前网上对于事务消息的“声音”基本上是使用类似二阶段提交,主要是根据消息系统标志MessageSysFlag中定义来推测的:

TRANSACTION_PREPARED_TYPE
TRANSACTION_COMMIT_TYPE
TRANSACTION_ROLLBACK_TYPE
消息发送者首先发送TRANSACTION_PREPARED_TYPE类型的消息,然后根据事务状态来决定是提交或回滚事务发送commit请求或rollback请求,如果commit/rollback请求丢失后,rocketmq会在指定超时时间后回查事务状态来决定提交或回滚事务。

让我们各自带着自己的理解和猜测,从阅读RocketMQ官方提供的Demo程序入手,试图窥探一些大体的信息。

Demo示例程序位于:/rocketmq-example/src/main/java/org/apache/rocketmq/example/transaction包中。该包中未放置消息消费者,为了验证事务的消息消费情况,我们可以从其他包copy一个消费者,从而先运行生产者,然后运行消费者,判断事务消息的预发放、提交、回滚等效果,二话不说,先运行一下,看下效果再说:
消息发送端运行结果:

SendResult [sendStatus=SEND_OK, msgId=C0A8010518DC6D06D69C8D5767EC0000, offsetMsgId=null, messageQueue=MessageQueue [topic=transaction_topic_test, brokerName=broker-a, queueId=1], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=C0A8010518DC6D06D69C8D57680F0001, offsetMsgId=null, messageQueue=MessageQueue [topic=transaction_topic_test, brokerName=broker-a, queueId=2], queueOffset=1]
SendResult [sendStatus=SEND_OK, msgId=C0A8010518DC6D06D69C8D57681E0002, offsetMsgId=null, messageQueue=MessageQueue [topic=transaction_topic_test, brokerName=broker-a, queueId=3], queueOffset=2]
SendResult [sendStatus=SEND_OK, msgId=C0A8010518DC6D06D69C8D57682B0003, offsetMsgId=null, messageQueue=MessageQueue [topic=transaction_topic_test, brokerName=broker-a, queueId=0], queueOffset=3]
SendResult [sendStatus=SEND_OK, msgId=C0A8010518DC6D06D69C8D5768380004, offsetMsgId=null, messageQueue=MessageQueue [topic=transaction_topic_test, brokerName=broker-a, queueId=1], queueOffset=4]
SendResult [sendStatus=SEND_OK, msgId=C0A8010518DC6D06D69C8D5768490005, offsetMsgId=null, messageQueue=MessageQueue [topic=transaction_topic_test, brokerName=broker-a, queueId=2], queueOffset=5]
SendResult [sendStatus=SEND_OK, msgId=C0A8010518DC6D06D69C8D5768560006, offsetMsgId=null, messageQueue=MessageQueue [topic=transaction_topic_test, brokerName=broker-a, queueId=3], queueOffset=6]
SendResult [sendStatus=SEND_OK, msgId=C0A8010518DC6D06D69C8D5768640007, offsetMsgId=null, messageQueue=MessageQueue [topic=transaction_topic_test, brokerName=broker-a, queueId=0], queueOffset=7]
SendResult [sendStatus=SEND_OK, msgId=C0A8010518DC6D06D69C8D5768730008, offsetMsgId=null, messageQueue=MessageQueue [topic=transaction_topic_test, brokerName=broker-a, queueId=1], queueOffset=8]
SendResult [sendStatus=SEND_OK, msgId=C0A8010518DC6D06D69C8D5768800009, offsetMsgId=null, messageQueue=MessageQueue [topic=transaction_topic_test, brokerName=broker-a, queueId=2], queueOffset=9]
消息消费端效果:

Consumer Started.
ConsumeMessageThread_1 Receive New Messages: [MessageExt [queueId=0, storeSize=325, queueOffset=0, sysFlag=8, bornTimestamp=1532745715812, bornHost=/192.168.1.5:55482, storeTimestamp=1532745749010, storeHost=/192.168.1.5:10911, msgId=C0A8010500002A9F0000000000001DE8, commitLogOffset=7656, bodyCRC=988340972, reconsumeTimes=0, preparedTransactionOffset=5477, toString()=Messagetopic=‘transaction_topic_test‘, flag=0, properties=MIN_OFFSET=0, REAL_TOPIC=transaction_topic_test, TRANSACTION_CHECK_TIMES=1, MAX_OFFSET=1, KEYS=KEY7, TRAN_MSG=true, CONSUME_START_TIME=1532746024360, UNIQ_KEY=C0A8010518DC6D06D69C8D5768640007, WAIT=true, PGROUP=please_rename_unique_group_name, TAGS=TagC, REAL_QID=0, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 55], transactionId=‘C0A8010518DC6D06D69C8D5768640007‘]]
ConsumeMessageThread_2 Receive New Messages: [MessageExt [queueId=1, storeSize=325, queueOffset=0, sysFlag=8, bornTimestamp=1532745715768, bornHost=/192.168.1.5:55482, storeTimestamp=1532745749008, storeHost=/192.168.1.5:10911, msgId=C0A8010500002A9F0000000000001B91, commitLogOffset=7057, bodyCRC=601994070, reconsumeTimes=0, preparedTransactionOffset=4496, toString()=Messagetopic=‘transaction_topic_test‘, flag=0, properties=MIN_OFFSET=0, REAL_TOPIC=transaction_topic_test, TRANSACTION_CHECK_TIMES=1, MAX_OFFSET=1, KEYS=KEY4, TRAN_MSG=true, CONSUME_START_TIME=1532746024361, UNIQ_KEY=C0A8010518DC6D06D69C8D5768380004, WAIT=true, PGROUP=please_rename_unique_group_name, TAGS=TagE, REAL_QID=1, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 52], transactionId=‘C0A8010518DC6D06D69C8D5768380004‘]]
ConsumeMessageThread_3 Receive New Messages: [MessageExt [queueId=2, storeSize=325, queueOffset=0, sysFlag=8, bornTimestamp=1532745715727, bornHost=/192.168.1.5:55482, storeTimestamp=1532745748834, storeHost=/192.168.1.5:10911, msgId=C0A8010500002A9F000000000000193A, commitLogOffset=6458, bodyCRC=1401636825, reconsumeTimes=0, preparedTransactionOffset=3515, toString()=Messagetopic=‘transaction_topic_test‘, flag=0, properties=MIN_OFFSET=0, REAL_TOPIC=transaction_topic_test, TRANSACTION_CHECK_TIMES=1, MAX_OFFSET=1, KEYS=KEY1, TRAN_MSG=true, CONSUME_START_TIME=1532746024368, UNIQ_KEY=C0A8010518DC6D06D69C8D57680F0001, WAIT=true, PGROUP=please_rename_unique_group_name, TAGS=TagB, REAL_QID=2, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 49], transactionId=‘C0A8010518DC6D06D69C8D57680F0001‘]]
综上所述,服务端发送了10条消息,而消费端只收到3条消息,应该是由于事务回滚,造成只提交了3条消息,为了更加严谨,可以安装一个rocketmq-consonse,更加直观的观察shangshagn‘s上述结果:
技术分享图片

接下来对示例代码进行解读:

1、生产者端代码解读:

public class TransactionProducer
public static void main(String[] args) throws MQClientException, InterruptedException
TransactionListener transactionListener = new TransactionListenerImpl(); // @1
TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("127.0.0.1:9876");
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() br/>@Override
public Thread newThread(Runnable r)
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;

); // @2
producer.setExecutorService(executorService); // @3
producer.setTransactionListener(transactionListener); // @4
producer.start();
String[] tags = new String[] "TagA", "TagB", "TagC", "TagD", "TagE";
for (int i = 0; i < 10; i++) // @5
try
Message msg =
new Message("transaction_topic_test", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);

            Thread.sleep(10);
         catch (MQClientException | UnsupportedEncodingException e) 
            e.printStackTrace();
        
    
    for (int i = 0; i < 100000; i++)      //这里只是阻止生产者过早退出,导致事务消息的相关机制无法运行
        Thread.sleep(1000);
    
    producer.shutdown();


代码@1:创建TransactionListener 实例,字面理解为事务消息事件监听器,下文详细对其进行展开。
代码@2:ExecutorService executorService,创建一个线程池,其线程的名称前缀”client-transaction-msg-check-thread“,从字面理解为客户端事务消息状态检测线程,我们可以大胆的猜测一下是不是这个线程池调用TransactionListener方法,完成对事务消息的检测呢?【这里只是作者的猜测,大家不能当真,在作者后续文章发布后,如果该观点错误,会加以修复,这里写出来,主要是想分享一下我读源码的方法】。
br/>代码@3:为事务消息发送者设置线程池。
代码@4:为事务消息发送者设置事务监听器。
代码@5:发送10条消息。

2、TransactionListener代码解读

public class TransactionListenerImpl implements TransactionListener
private AtomicInteger transactionIndex = new AtomicInteger(0);

private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();

@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) 
    int value = transactionIndex.getAndIncrement();
    int status = value % 3;
    localTrans.put(msg.getTransactionId(), status);
    return LocalTransactionState.UNKNOW;


@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) 
    Integer status = localTrans.get(msg.getTransactionId());
    if (null != status) 
        switch (status) 
            case 0:
                return LocalTransactionState.UNKNOW;
            case 1:
                return LocalTransactionState.COMMIT_MESSAGE;
            case 2:
                return LocalTransactionState.ROLLBACK_MESSAGE;
        
    
    return LocalTransactionState.COMMIT_MESSAGE;


executeLocalTransaction方法:记录本地事务的事务状态,这里其实现就是循环设置事务消息的状态为0,1,2,demo中是把消息的状态数据存放在一个Map中。实际应用时通常会持久化消息的事务状态,例如数据库或缓存。
checkLocalTransaction方法,事务回查业务实现,查本地事务表,判断事务的状态如为0:UNKNOW,1:COMMIT_MESSAGE;ROLLBACK_MESSAGE。这里就能解释,生产者连续发10条消息,因为只有3条消息的事务状态为COMMIT_MESSAGE,故消息消费者只能消费3条。
到这里,基本上还是可以得知事务消息的实现方式,基本与文章开头所示的“网上声音”实现类似,下一节将详细分析TransactionMQProducer事务消息发送的实现细节。

郑重声明:本文主要是展示事务消息的基本使用,本文所下的结论还仅仅是作者的猜测,下一篇文章,将重点分析事务消息的实现细节,本文一个非常重要的目的,是向读者朋友们展示作者学习源码的一个方法,总结为:先做全面了解(网上,官方文档)、然后加以自己的思考,从Demo实例入手学习,将学习任务分解之,边写边看。

这算不算文末有彩蛋呢?呵呵,下一篇见:详细分析RocketMQ事务消息的实现细节。

本文节选自书籍《RocketMQ技术内幕:RocketMQ架构设计与实现原理》
技术分享图片



rocketmq事务消息实战(代码片段)

RocketMQ事务消息阅读目录指引:RocketMQ源码分析之从官方示例窥探RocketMQ事务消息实现基本思想RocketMQ源码分析之RocketMQ事务消息实现原理上篇RocketMQ源码分析之RocketMQ事务消息实现原理中篇----事务消息状态回查RocketMQ源码分析... 查看详情

消息中间件rocketmq源码解析:事务消息

650)this.width=650;"src="http://www.yunai.me/images/common/wechat_mp.jpeg"style="border:2pxsolidrgb(238,238,238);margin-top:0px;"/>关注微信公众号:【芋艿的后端小屋】有福利:RocketMQ/MyCAT/Sharding-JDBC 所有源码分析文章列表Rock 查看详情

rocketmq基础概念剖析,并分析一下producer的底层源码

这篇博客聊聊关于RocketMQ相关的东西,主要聊的点有RocketMQ的功能使用、RocketMQ的底层运行原理和部分核心逻辑的源码分析。至于我们为什么要用MQ、使用MQ能够为我们带来哪些好处、MQ在社区有哪些实现、社区的各个MQ的优劣对比... 查看详情

rocketmq源码合集

消息队列中间件RocketMQ源码分析——Message发送与接收消息队列中间件RocketMQ源码分析——Message存储分布式消息队列RocketMQ源码分析——Message拉取与消费(上)分布式消息队列RocketMQ源码分析——Message拉取与消费(下&#x... 查看详情

memcached源码分析之从set命令开始说起

作者:Calix如果直接把memcached的源码从main函数开始说,恐怕会有点头大,所以这里以一句经典的“SET”命令简单地开个头,算是回忆一下memcached的作用,后面的结构篇中关于命令解析部分主要也是围绕着SET命令展开分析,... 查看详情

技术干货|源码解析github上14.1kstar的rocketmq

前言ApacheRocketMQ作为广为人知的开源消息中间件,诞生于阿里巴巴,于2016年捐赠给了Apache。从RocketMQ4.0到如今最新的v4.7.1,不论是在阿里巴巴内部还是外部社区,都赢得了广泛的关注和好评。本文将站在发送方视角,通过阅读Rock... 查看详情

免费专辑

...唯有坚持不懈』又名『中间件兴趣圈』,由畅销书《RocketMQ技术内幕》作者、RocketMQ官方社区优秀布道师维护,旨在成体系剖析互联网JAVA主流中间件,涵盖RocketMQ、Netty、Elasticjob、Dubbo、Canal、Sentinel、等互联网大厂必... 查看详情

rocketmq源码合集

消息队列中间件RocketMQ源码分析——Message发送与接收消息队列中间件RocketMQ源码分析——Message存储分布式消息队列RocketMQ源码分析——Message拉取与消费(上)分布式消息队列RocketMQ源码分析——Message拉取与消费(下&#x... 查看详情

rocketmq原理学习--索引

 1、RocketMQ原理学习--RocketMQ源码运行 2、RocketMQ原理学习--RocketMQ整体架构窥探 3、RocketMQ原理学习--消息类型 4、RocketMQ原理学习--NameServer 5、RocketMQ原理学习---生产者普通消息发送 6、RocketMQ原理学习---生产者... 查看详情

《逐梦旅程windows游戏编程之从零开始》源码分析2——gdi

GDI:图形设备接口1.取得设备环境的句柄(如屏幕)使用BeginPaint和EndPaint这两个函数,或者使用GetDC和ReleaseDC这两个函数。关于函数的具体说明可以参考mdsn文档。一个GDI程序通用框架:1#include<windows.h>23#defineWINDOW_WIDTH800//为窗口宽... 查看详情

数据库中间件mycat源码分析——调试环境搭建

650)this.width=650;"src="http://www.yunai.me/images/common/wechat_mp.jpeg"style="text-align:justify;height:auto;margin:auto;"/>关注**微信公众号:【芋艿的后端小屋】**有福利:RocketMQ/MyCAT/Sharding-JDBC 所有源码分析文章列表Rock 查看详情

源码分析rocketmq与运维实战

 RocketMQ是笔者当前最突出的亮点,正是由于在CSDN中连载RocketMQ,最终促成了《RocketMQ技术内幕》一书的出版,也凭借此专栏的高质量,最终成为CSDN2020年年度博客之星TOP2。RocketMQ专栏目前已经输出48篇文章,并... 查看详情

rocketmq源码分析——高可用

概述本文主要解析Namesrv、Broker如何实现高可用,Producer、Consumer怎么与它们通信保证高可用。Namesrv高可用启动多个Namesrv实现高可用。相较于Zookeeper、Consul、Etcd等,Namesrv是一个超轻量级的注册中心,提供命名服务。2.1Broker注册到... 查看详情

rocketmq源码分析——message存储(代码片段)

CommitLog结构CommitLog、MappedFileQueue、MappedFile的关系如下:CommitLog:MappedFileQueue:MappedFile=1:1:N。反应到系统文件如下:···Yunai-MacdeMacBook-Pro-2:commitlogyunai$pwd/Users/yunai/store/commitlogYunai-MacdeMacBook-P 查看详情

spring源码窥探之:注解方式的aop原理

AOP入口代码分析通过注解的方式来实现AOP1.@EnableAspectJAutoProxy通过@Import注解向容器中注入了AspectJAutoProxyRegistrar这个类,而它在容器中的名字是org.springframework.aop.config.internalAutoProxyCreator。2.AspectJAutoProxyRegistrar实现了ImportBean 查看详情

rocketmq源码分析3-consumer消息获取

使用rocketmq的大体消息发送过程如下:在前面已经分析过MQ的broker接收生产者客户端发过来的消息的过程,此文主要讲述订阅者获取消息的过程,或者说broker是怎样将消息传递给消费者客户端的,即上面时序图中拉取消息(pullmessag... 查看详情

rocketmq源码分析之rocketmq事务消息实现原下篇(事务提交或回滚)

本文将重点分析RocketMQBroker如何处理事务消息提交、回滚命令,根据前面的介绍,其入口EndTransactionProcessor#proce***equest:OperationResultresult=newOperationResult();if(MessageSysFlag.TRANSACTION_COMMIT_TYPE==requestHeader.getCommitOrRollback())//@1result=this.broke... 查看详情

rocketmq事务消息篇之事务消息源码分析(代码片段)

前言RocketMQ事务消息篇(一)之事务消息的介绍RocketMQ事务消息篇(二)之事务消息的使用本文继前两篇对事务消息源码进行分析。事务消息处理基本流程在介绍事务消息的时候,画了一个简单的流程图说明事... 查看详情