关键词:
基于官方文档提供的事务消息API使用的例子来进行分析,这里会把订单系统的业务场景房子里面,加入一些伪代码进行参考。
1. 发送half事务消息出去
package com.mqTrsMessage;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.util.concurrent.*;
/**
* @ClassName TransactionProducer
* @Description TODO 事务消息机制
* @Author wushaopei
* @Date 2021/7/18 15:05
* @Version 1.0
*/
public class TransactionProducer
public static void main(String[] args) throws Exception
// 这个东西就是用来接收RcoketMQ回调的一个监听器接口
// 这里会实现执行订单本地事务,commit,rollback,回调查询等逻辑
TransactionListener transactionListener = new TransactionListenerImpl();
// 下面这个就是创建支持事务消息的Producer
// 对这个Producer还得指定要指定一个生产者分组,根据业务指定名字
TransactionMQProducer producer = new
TransactionMQProducer("TestProducerGroup");
producer.setNamesrvAddr("192.168.133.115:9876");
// 下面这个是指定了一个线程池,里面会包含一些线程
// 这个线程池里的线程就是用来处理RocketMQ回调你的请求
ExecutorService executorService = new ThreadPoolExecutor(
2,
5,
100,
TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory()
@Override
public Thread newThread(Runnable r)
Thread thread = new Thread(r);
thread.setName("TestThread");
return thread;
);
// 给事务消息生产者设置对应的线程池,负责执行RocketMQ回调请求
producer.setExecutorService(executorService);
// 给事务消息生产者设置对应的回避函数
producer.setTransactionListener(transactionListener);
//启动实例
producer.start();
// 构造一条订单支付成功的消息,指定Topic是谁
Message msg = new Message("PayOrderSuccessTopic" /* Topic */,
"TestTag" /* Tag */,
"TestKey",
("订单支付消息").getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
//Call send message to deliver message to one of brokers.
SendResult sendResult = producer.sendMessageInTransaction(msg,null);
System.out.printf("%s%n", sendResult);
2.加入half消息发送失败,或者没收到half消息响应怎么办?
假如发送half消息失败了,就会在执行“producer.sendMessageInTransaction(msg,null);” 的时候,收到一个异常,发现消息发送失败了。
可以使用下面的代码去关注half消息发送失败的问题:
try
SendResult sendResult = producer.sendMessageInTransaction(msg,null);
catch(Exception e)
// half消息发送失败
// 订单系统执行回滚逻辑,比如说触发支付退款,更新订单状态为“已关闭”
如果一直没有收到half消息发送成功的通知呢?
针对这个问题,可以把发送出去的half消息放在内存里,或者写入本地磁盘文件,后台开启一个线程去检查,如果一个half消息超过比如10分钟都没有收到响应,那就自动触发回滚逻辑。这个补偿机制是MQ客户端内部自己的。
3.如果half消息成功了,如何执行订单本地事务?
public class TransactionListenerImpl implements TransactionListener
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o)
// 执行订单本地事务
// 接着执行本地一连串事务执行结果,去选择执行commit or rollback
try
// 如果本地事务都执行成功了,返回commit
return LocalTransactionState.COMMIT_MESSAGE;
catch (Exception e)
// 本地事务执行失败,回滚所有一切执行过的操作
// 如果本地事务执行失败了,返回rollback,标记half消息无效
return LocalTransactionState.ROLLBACK_MESSAGE;
4.如果没有返回commit或者rollback,如何进行回调?
package com.mqTrsMessage;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
/**
* @ClassName TransactionListenerImpl
* @Description TODO 事务消息监听器
* @Author wushaopei
* @Date 2021/7/18 15:15
* @Version 1.0
*/
public class TransactionListenerImpl implements TransactionListener
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o)
// 执行订单本地事务
// 接着执行本地一连串事务执行结果,去选择执行commit or rollback
try
// 如果本地事务都执行成功了,返回commit
return LocalTransactionState.COMMIT_MESSAGE;
catch (Exception e)
// 本地事务执行失败,回滚所有一切执行过的操作
// 如果本地事务执行失败了,返回rollback,标记half消息无效
return LocalTransactionState.ROLLBACK_MESSAGE;
// 如果因为各种原因,没有返回 commit 或者rollback
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg)
// 查询本地事务,是否执行成功了
Integer status = LocalTrans.transMap.get(msg.getTransactionId());
// 根据本地事务的情况去选择执行 commit or rollback
if(null != status)
switch (status)
case 0: return LocalTransactionState.UNKNOW;
case 1: return LocalTransactionState.COMMIT_MESSAGE;
case 2: return LocalTransactionState.ROLLBACK_MESSAGE;
return LocalTransactionState.COMMIT_MESSAGE;
5.debug测试代码
由截图可知,该事务消息已成功写入MQ进程服务上,并返回了commit。
11springboot整合rocketmq实现事务消息(代码片段)
事务消息是RocketMQ提供的非常重要的一个特性,在4.x版本之后开源,可以利用事务消息轻松地实现分布式事务。RocketMQ在其消息定义的基础上,对事务消息扩展了两个相关的概念:Half(Prepare)Message——半消息(预处理... 查看详情
rocketmq实现事务消息方案(代码片段)
RocketMQ是一个来自阿里巴巴的分布式消息中间件,于2012年开源,并在2017年正式成为Apache顶级项目。据了解,包括阿里云上的消息产品以及收购的子公司在内,阿里集团的消息产品全线都运行在RocketMQ之上,并且最近几年的双十一... 查看详情
rocketmq源码分析之从官方示例窥探:rocketmq事务消息实现基本思想(代码片段)
RocketMQ4.3.0版本开始支持事务消息,后续分享将开始将剖析事务消息的实现原理。首先从官方给出的Demo实例入手,以此通往RocketMQ事务消息的世界中。官方版本未发布之前,从apacherocketmq第一个版本上线后,代码中存在与事务消息... 查看详情
rocketmq(09)——发送事务消息(代码片段)
发送事务消息RocketMQ支持发送事务消息,它的事务消息是基于二阶段提交机制实现的。当发送的消息是事务消息时,只有对应的消息被提交了才能被消费者进行消费。发送事务消息时生产者需要使用TransactionMQProducer,它还需要指... 查看详情
rocketmq事务消息实战(代码片段)
RocketMQ事务消息阅读目录指引:RocketMQ源码分析之从官方示例窥探RocketMQ事务消息实现基本思想RocketMQ源码分析之RocketMQ事务消息实现原理上篇RocketMQ源码分析之RocketMQ事务消息实现原理中篇----事务消息状态回查RocketMQ源码分析... 查看详情
rocketmq源码分析之rocketmq事务消息实现原理中篇----事务消息状态回查(代码片段)
上节已经梳理了RocketMQ发送事务消息的流程(基于二阶段提交),本节将继续深入学习事务状态消息回查,我们知道,第一次提交到消息服务器时消息的主题被替换为RMQ_SYS_TRANS_HALF_TOPIC,本地事务执行完后如果返回本地事务状态... 查看详情
rocketmq源码分析之rocketmq事务消息实现原理中篇----事务消息状态回查(代码片段)
上节已经梳理了RocketMQ发送事务消息的流程(基于二阶段提交),本节将继续深入学习事务状态消息回查,我们知道,第一次提交到消息服务器时消息的主题被替换为RMQ_SYS_TRANS_HALF_TOPIC,本地事务执行完后如果返回本地事务状态... 查看详情
rocketmq(09)——发送事务消息(代码片段)
发送事务消息RocketMQ支持发送事务消息,它的事务消息是基于二阶段提交机制实现的。当发送的消息是事务消息时,只有对应的消息被提交了才能被消费者进行消费。发送事务消息时生产者需要使用TransactionMQProducer,... 查看详情
rocketmq事务消息原理(代码片段)
一、RocketMQ事务消息原理: RocketMQ在4.3版本之后实现了完整的事务消息,基于MQ的分布式事务方案,本质上是对本地消息表的一个封装,整体流程与本地消息表一致,唯一不同的就是将本地消息表存... 查看详情
rocketmq事务消息详解(代码片段)
...#x1f34a;Java学习:Java从入门到精通总结🍊深入浅出RocketMQ设计思想:深入浅出RocketMQ设计思想🍊绝对不一样的职场干货:大厂最佳实践经验指南📆最近更新:2023年4月9日🍊个人简介:通信工程本... 查看详情
rocketmq事务消息机制(代码片段)
RocketMQ提供了事务消息,通过事务消息就能达到分布式事务的最终一致,从而实现了可靠消息服务。一、事务消息的实现步骤事务消息发送步骤:1.发送方将半事务消息发送至RocketMQ服务端。2.RocketMQ服务端将消息持久... 查看详情
rocketmq事务消息篇之事务消息的使用(代码片段)
前言在RocketMQ事务消息篇(一)之事务消息的介绍里对RocketMQ的事务消息作了相关说明,本文提供一些基本的开发示例。java示例依赖<dependencies><dependency><groupId>org.apache.rocketmq</groupId><artifactId>ro 查看详情
rocketmq事务消息篇之事务消息源码分析(代码片段)
前言RocketMQ事务消息篇(一)之事务消息的介绍RocketMQ事务消息篇(二)之事务消息的使用本文继前两篇对事务消息源码进行分析。事务消息处理基本流程在介绍事务消息的时候,画了一个简单的流程图说明事... 查看详情
rocketmq的分布式事务机制(事务消息)(代码片段)
详细介绍了RocketMQ的事务消息机制,RocketMQ的事务消息可以用于实现基于可靠消息的最终一致性的分布式事务。文章目录1事务消息简要流程2一阶段半消息不可见的设计3二阶段Commit和Rollback操作4Op消息的设计5Commit消息变得可见... 查看详情
rocketmq使用事务消息(代码片段)
...ate.ROLLBACK_MESSAGE、LocalTransactionState.UNKNOW。原理事务消息是RocketMQ的一大特性,其保证发送消息和执行本地逻辑在同一个事务内。实现的思路借鉴了两阶段提交协议:第一阶段:发送半事务消息,消息发送后,消... 查看详情
rocketmq(10)——发送延时消息(代码片段)
...达了Broker就会写入消息队列,消费者就可以进行消费了。RocketMQ支持我们发送延时消费的消息,即现在发送的消息先发送到Broker,但是需要过一会才能进行消费。如果需要发送延时消息,只需要通过Message的setDelayDelevel()指定一个... 查看详情
rocketmq源码解析-消息消费(代码片段)
RocketMQ源码解析-消息消费1.消费者相关类2.消费者的启动3.消息的拉取4.消费者的负载均衡5.消息的消费6.消费进度管理看了很多遍的代码,还是决定动手把记录下来,梳理一下整体结构和实现细节,给大家一个参考,写的不好的地... 查看详情
rocketmq事务消息
RocketMQ事务消息在实现上充分利用了RocketMQ本身机制,在实现零依赖的基础上,同样实现了高性能、可扩展、全异步等一系列特性。在具体实现上,RocketMQ通过使用HalfTopic以及OperationTopic两个内部队列来存储事务消息推进状态,如... 查看详情