rocketmq解决分布式事务(代码片段)

lmyupupblogs lmyupupblogs     2023-04-30     568

关键词:

1.原理图:

技术图片

 

 2.设计实现思路:

1.生产者(发送方)投递事务消息到Broker中,设置该消息为半消息 可以被消费;

2.开始执行我们的本地事务,本地事务执行的结果(回滚或者提交)发送Broker;

3.Broker获取回滚或者提交,如果是回滚的情况则删除该消息、如果是提交的话,该消息就可以被消费者消费;

4.Broker如果没有及时的获取发送方本地事务结果的话,会主动查询本地事务结果

 

核心代码发送方

 

 1 @RestController
 2 public class ProducerController 
 3     @Autowired
 4     private OrderService orderService;
 5 
 6     @RequestMapping("/sendMsg")
 7     public String sendMsg() throws MQClientException, RemotingException, InterruptedException, MQBrokerException 
 8         String orderId = orderService.sendOrder();
 9         return orderId;
10 
11     
12 

 

 1 @Slf4j
 2 @Component
 3 @RocketMQTransactionListener(txProducerGroup = "mayiktProducer")
 4 public class SyncProducerListener implements RocketMQLocalTransactionListener 
 5 
 6     @Autowired
 7     private TransationalUtils transationalUtils;
 8 
 9     @Autowired
10     private OrderMapper orderMapper;
11 
12     @Override
13     public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) 
14         TransactionStatus beginStatus = null;
15         try 
16             beginStatus = transationalUtils.begin();
17             MessageHeaders headers = message.getHeaders();
18             String objMsg = (String) headers.get("msg");
19             if (StringUtils.isEmpty(objMsg)) 
20                 return RocketMQLocalTransactionState.ROLLBACK;
21             
22             OrderEntity orderEntity = JSONObject.parseObject(objMsg, OrderEntity.class);
23             int result = orderMapper.addOrder(orderEntity);
24             if (result > 0) 
25                 transationalUtils.commit(beginStatus);
26             
27             log.info("【本地业务执行完毕】 msg:, Object:", message, o);
28             return null;
29          catch (Exception e) 
30             e.printStackTrace();
31             log.error("【执行本地业务异常】 exception message:", e.getMessage());
32             if (beginStatus != null) 
33                 transationalUtils.rollback(beginStatus);
34             
35             return RocketMQLocalTransactionState.ROLLBACK;
36         
37     
38 
39     @Override
40     public RocketMQLocalTransactionState checkLocalTransaction(Message message) 
41         log.info("【执行检查任务】");
42         MessageHeaders headers = message.getHeaders();
43         String objMsg = (String) headers.get("msg");
44         if (StringUtils.isEmpty(objMsg)) 
45             return RocketMQLocalTransactionState.UNKNOWN;
46         
47         OrderEntity orderEntity = JSONObject.parseObject(objMsg, OrderEntity.class);
48         String orderId = orderEntity.getOrderId();
49         OrderEntity orderDbEntity = orderMapper.findOrderId(orderId);
50         if (orderDbEntity == null) 
51             return RocketMQLocalTransactionState.UNKNOWN;
52         
53         return RocketMQLocalTransactionState.COMMIT;
54     
55 

消费者

 1 @Service
 2 @RocketMQMessageListener(topic = "orderTopic", consumerGroup = "mayiktTopic")
 3 public class OrdeConsumer implements RocketMQListener<String> 
 4     @Autowired
 5     private DispatchMapper dispatchMapper;
 6 
 7     @Override
 8     public void onMessage(String msg) 
 9         OrderEntity orderEntity = JSONObject.parseObject(msg, OrderEntity.class);
10         System.out.println(orderEntity.toString());
11     
12 
13 

手动事务

 1 @Service
 2 public class TransationalUtils 
 3     @Autowired
 4     public DataSourceTransactionManager transactionManager;
 5 
 6     public TransactionStatus begin() 
 7         TransactionStatus transaction = transactionManager.getTransaction(new DefaultTransactionAttribute());
 8         return transaction;
 9     
10 
11     public void commit(TransactionStatus transaction) 
12         transactionManager.commit(transaction);
13 
14     
15 
16     public void rollback(TransactionStatus transaction) 
17         transactionManager.rollback(transaction);
18     
19 
20 

注:该代码来源于蚂蚁课堂(www.mayikt.com),于本人学习使用。

七.rocketmq极简入门-rocketmq事务消息(代码片段)

...库同时提交或回滚,这种夸多个数据库的事务操作叫分布式事务。分布式事物的解决方案有很多,如:2PC,TCC,最终一致性 查看详情

rocketmq事务消息原理(代码片段)

...RocketMQ在4.3版本之后实现了完整的事务消息,基于MQ的分布式事务方案,本质上是对本地消息表的一个封装,整体流程与本地消息表一致,唯一不同的就是将本地消息表存在了MQ内部,而不是业务数据库,事... 查看详情

rocketmq实现事务消息方案(代码片段)

RocketMQ是一个来自阿里巴巴的分布式消息中间件,于2012年开源,并在2017年正式成为Apache顶级项目。据了解,包括阿里云上的消息产品以及收购的子公司在内,阿里集团的消息产品全线都运行在RocketMQ之上,并且最近几年的双十一... 查看详情

搞懂分布式技术19:使用rocketmq事务消息解决分布式事务

搞懂分布式技术19:使用RocketMQ事务消息解决分布式事务初步认识RocketMQ的核心模块rocketmq模块rocketmq-broker:接受生产者发来的消息并存储(通过调用rocketmq-store),消费者从这里取得消息。rocketmq-client:提供发送、接受消息的客... 查看详情

rocketmq事务消息入门介绍(代码片段)

...概流程说明白,后续再具体细节进行开篇说明。主题引出分布式事务相关内容。RocketMQ事务消息。RocketMQ事务消息如何使用。RocketMQ事务消息是怎么实现的。为什么需要事务消息会查 查看详情

构建基于rocketmq的分布式事务服务(代码片段)

...一篇文章,就来聊一下在软件工程学上的长久的难题——分布式事务(DistributedTransaction)。这个技术也在各个诸如阿里,腾讯等大厂的内部,被广泛地实现,利用及优化。但是由于理论上就有难点,所以分布式事务就隐晦得成... 查看详情

rocketmq的分布式事务机制(事务消息)(代码片段)

...tMQ的事务消息可以用于实现基于可靠消息的最终一致性的分布式事务。文章目录1事务消息简要流程2一阶段半消息不可见的设计3二阶段Commit和Rollback操作4Op消息的设计5Commit消息变得可见6消息回查7最终一致性分布式事务常用于保... 查看详情

11springboot整合rocketmq实现事务消息(代码片段)

...f0c;在4.x版本之后开源,可以利用事务消息轻松地实现分布式事务。RocketMQ在其消息定义的基础上,对事务消息扩展了两个相关的概念:Half(Prepare)Message——半消息(预处理消息)半消息是一种特殊的消息类型,该状态... 查看详情

rocketmq单机环境搭建(代码片段)

...,带来一篇搭建RocketMQ单机环境的文章,为后面的分布式事务专栏做准备。RocketMQ是阿里巴巴开源的一款高性能分布式消息中间件,有关RocketMQ的详细讲解,后面会单独开设一个RocketMQ专栏。这里,先简单介绍一... 查看详情

rocketmq使用(代码片段)

...试)2,有序消息.(秒杀,等需要有序的消费场景)3,事务消息.(分布式事务场景,需要mq回滚)rocketmq  消费类型常用:1,消费组(同一个组下面只消费一次,用于分布式集群场景)2,订阅模式消费.[consumer.setMessageModel(MessageModel.BROADCASTING);]... 查看详情

spring解决rocketmq发消息与mysql事务一致性(代码片段)

场景用户订单并支付发送消息开通查看文章权限//伪代码@Transactional(rollbackFor=Exception.class)publicvoidpay(longuid,StringorderNO)Orderorder=orderService.selectOrder(uid,orderNO)if(order!=null)Stringstatus= 查看详情

rocketmq事务消息机制(代码片段)

RocketMQ提供了事务消息,通过事务消息就能达到分布式事务的最终一致,从而实现了可靠消息服务。一、事务消息的实现步骤事务消息发送步骤:1.发送方将半事务消息发送至RocketMQ服务端。2.RocketMQ服务端将消息持久... 查看详情

分布式事务解决方案|seata|本地消息表|事务消息|最大努力通知|消息丢失重复消费堆积有序

...息堆积怎么办:2.4.RocketMq保证消息有序性:关于分布式事务的基本理论(2PC、3PC等)可以参考链接1.解决方案:主要有seata、基于MQ的最终一致性方案(包括本地消息表、事务消息以及最大努力通知)1.1.... 查看详情

相较于rocketmq的事务消息,本地消息表才是真正的王者(代码片段)

1.概览在分布式系统中,系统间的通信除了大家所熟知的RPC外,基于MQ的异步通信也越来越流行,已经成为基础设施的重要组成部分。而MQ的引入对系统间的数据一致性提出了新的挑战,逐渐成为系统稳定性的一大... 查看详情

rocketmq单机环境搭建(代码片段)

...,带来一篇搭建RocketMQ单机环境的文章,为后面的分布式事务专栏做准备。RocketMQ是阿里巴巴开源的一款高性能分布式消息中间件,有关RocketMQ的详细讲解,后面会单独开设一个RocketMQ专栏。这里,先简单介绍一... 查看详情

分布式事务解决方案

RocketMQ分布式事务https://www.cnblogs.com/linjiqin/p/9561641.html分布式事务解决方案https://www.cnblogs.com/taiyonghai/p/6094350.html 查看详情

spring解决rocketmq发消息与mysql事务一致性(代码片段)

...)Stringstatus="paid";orderDao.updateStatus(uid,orderNo,status);rocketMQTemplate.send("order:status",message(uid,orderNo,order.itemId,status));publicclassOrderStatusArticleListenerimplementsRocketMQListenerpublicvoidonMessage(message)Orderorder=orderService.selectOrder(message... 查看详情

rocketmq详解系列(代码片段)

什么是RocketMQRocketMQ作为一款纯java、分布式、队列模型的开源消息中间件,支持事务消息、顺序消息、批量消息、定时消息、消息回溯等。主要功能是异步解耦和流量削峰:。常见的MQ主要有:ActiveMQ、RabbitMQ、Kafka、Ro... 查看详情