关键词:
1.原理图:
2.设计实现思路:
1.生产者(发送方)投递事务消息到Broker中,设置该消息为半消息 不可以被消费;
2.开始执行我们的本地事务,将本地事务执行的结果(回滚或者提交)发送给Broker;
3.Broker获取回滚或者提交,如果是回滚的情况则删除该消息、如果是提交的话,该消息就可以被消费者消费;
4.Broker如果没有及时的获取发送方本地事务结果的话,会主动查询本地事务结果。
核心代码发送方
1 @RestController 2 public class ProducerController 3 @Autowired 4 private OrderService orderService; 5 6 @RequestMapping("/sendMsg") 7 public String sendMsg() throws MQClientException, RemotingException, InterruptedException, MQBrokerException 8 String orderId = orderService.sendOrder(); 9 return orderId; 10 11 12
1 @Slf4j 2 @Component 3 @RocketMQTransactionListener(txProducerGroup = "mayiktProducer") 4 public class SyncProducerListener implements RocketMQLocalTransactionListener 5 6 @Autowired 7 private TransationalUtils transationalUtils; 8 9 @Autowired 10 private OrderMapper orderMapper; 11 12 @Override 13 public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) 14 TransactionStatus beginStatus = null; 15 try 16 beginStatus = transationalUtils.begin(); 17 MessageHeaders headers = message.getHeaders(); 18 String objMsg = (String) headers.get("msg"); 19 if (StringUtils.isEmpty(objMsg)) 20 return RocketMQLocalTransactionState.ROLLBACK; 21 22 OrderEntity orderEntity = JSONObject.parseObject(objMsg, OrderEntity.class); 23 int result = orderMapper.addOrder(orderEntity); 24 if (result > 0) 25 transationalUtils.commit(beginStatus); 26 27 log.info("【本地业务执行完毕】 msg:, Object:", message, o); 28 return null; 29 catch (Exception e) 30 e.printStackTrace(); 31 log.error("【执行本地业务异常】 exception message:", e.getMessage()); 32 if (beginStatus != null) 33 transationalUtils.rollback(beginStatus); 34 35 return RocketMQLocalTransactionState.ROLLBACK; 36 37 38 39 @Override 40 public RocketMQLocalTransactionState checkLocalTransaction(Message message) 41 log.info("【执行检查任务】"); 42 MessageHeaders headers = message.getHeaders(); 43 String objMsg = (String) headers.get("msg"); 44 if (StringUtils.isEmpty(objMsg)) 45 return RocketMQLocalTransactionState.UNKNOWN; 46 47 OrderEntity orderEntity = JSONObject.parseObject(objMsg, OrderEntity.class); 48 String orderId = orderEntity.getOrderId(); 49 OrderEntity orderDbEntity = orderMapper.findOrderId(orderId); 50 if (orderDbEntity == null) 51 return RocketMQLocalTransactionState.UNKNOWN; 52 53 return RocketMQLocalTransactionState.COMMIT; 54 55
消费者
1 @Service 2 @RocketMQMessageListener(topic = "orderTopic", consumerGroup = "mayiktTopic") 3 public class OrdeConsumer implements RocketMQListener<String> 4 @Autowired 5 private DispatchMapper dispatchMapper; 6 7 @Override 8 public void onMessage(String msg) 9 OrderEntity orderEntity = JSONObject.parseObject(msg, OrderEntity.class); 10 System.out.println(orderEntity.toString()); 11 12 13
手动事务
1 @Service 2 public class TransationalUtils 3 @Autowired 4 public DataSourceTransactionManager transactionManager; 5 6 public TransactionStatus begin() 7 TransactionStatus transaction = transactionManager.getTransaction(new DefaultTransactionAttribute()); 8 return transaction; 9 10 11 public void commit(TransactionStatus transaction) 12 transactionManager.commit(transaction); 13 14 15 16 public void rollback(TransactionStatus transaction) 17 transactionManager.rollback(transaction); 18 19 20
注:该代码来源于蚂蚁课堂(www.mayikt.com),于本人学习使用。
七.rocketmq极简入门-rocketmq事务消息(代码片段)
...库同时提交或回滚,这种夸多个数据库的事务操作叫分布式事务。分布式事物的解决方案有很多,如:2PC,TCC,最终一致性 查看详情
rocketmq事务消息原理(代码片段)
...RocketMQ在4.3版本之后实现了完整的事务消息,基于MQ的分布式事务方案,本质上是对本地消息表的一个封装,整体流程与本地消息表一致,唯一不同的就是将本地消息表存在了MQ内部,而不是业务数据库,事... 查看详情
rocketmq实现事务消息方案(代码片段)
RocketMQ是一个来自阿里巴巴的分布式消息中间件,于2012年开源,并在2017年正式成为Apache顶级项目。据了解,包括阿里云上的消息产品以及收购的子公司在内,阿里集团的消息产品全线都运行在RocketMQ之上,并且最近几年的双十一... 查看详情
搞懂分布式技术19:使用rocketmq事务消息解决分布式事务
搞懂分布式技术19:使用RocketMQ事务消息解决分布式事务初步认识RocketMQ的核心模块rocketmq模块rocketmq-broker:接受生产者发来的消息并存储(通过调用rocketmq-store),消费者从这里取得消息。rocketmq-client:提供发送、接受消息的客... 查看详情
rocketmq事务消息入门介绍(代码片段)
...概流程说明白,后续再具体细节进行开篇说明。主题引出分布式事务相关内容。RocketMQ事务消息。RocketMQ事务消息如何使用。RocketMQ事务消息是怎么实现的。为什么需要事务消息会查 查看详情
构建基于rocketmq的分布式事务服务(代码片段)
...一篇文章,就来聊一下在软件工程学上的长久的难题——分布式事务(DistributedTransaction)。这个技术也在各个诸如阿里,腾讯等大厂的内部,被广泛地实现,利用及优化。但是由于理论上就有难点,所以分布式事务就隐晦得成... 查看详情
rocketmq的分布式事务机制(事务消息)(代码片段)
...tMQ的事务消息可以用于实现基于可靠消息的最终一致性的分布式事务。文章目录1事务消息简要流程2一阶段半消息不可见的设计3二阶段Commit和Rollback操作4Op消息的设计5Commit消息变得可见6消息回查7最终一致性分布式事务常用于保... 查看详情
11springboot整合rocketmq实现事务消息(代码片段)
...f0c;在4.x版本之后开源,可以利用事务消息轻松地实现分布式事务。RocketMQ在其消息定义的基础上,对事务消息扩展了两个相关的概念:Half(Prepare)Message——半消息(预处理消息)半消息是一种特殊的消息类型,该状态... 查看详情
rocketmq单机环境搭建(代码片段)
...,带来一篇搭建RocketMQ单机环境的文章,为后面的分布式事务专栏做准备。RocketMQ是阿里巴巴开源的一款高性能分布式消息中间件,有关RocketMQ的详细讲解,后面会单独开设一个RocketMQ专栏。这里,先简单介绍一... 查看详情
rocketmq使用(代码片段)
...试)2,有序消息.(秒杀,等需要有序的消费场景)3,事务消息.(分布式事务场景,需要mq回滚)rocketmq 消费类型常用:1,消费组(同一个组下面只消费一次,用于分布式集群场景)2,订阅模式消费.[consumer.setMessageModel(MessageModel.BROADCASTING);]... 查看详情
spring解决rocketmq发消息与mysql事务一致性(代码片段)
场景用户订单并支付发送消息开通查看文章权限//伪代码@Transactional(rollbackFor=Exception.class)publicvoidpay(longuid,StringorderNO)Orderorder=orderService.selectOrder(uid,orderNO)if(order!=null)Stringstatus= 查看详情
rocketmq事务消息机制(代码片段)
RocketMQ提供了事务消息,通过事务消息就能达到分布式事务的最终一致,从而实现了可靠消息服务。一、事务消息的实现步骤事务消息发送步骤:1.发送方将半事务消息发送至RocketMQ服务端。2.RocketMQ服务端将消息持久... 查看详情
分布式事务解决方案|seata|本地消息表|事务消息|最大努力通知|消息丢失重复消费堆积有序
...息堆积怎么办:2.4.RocketMq保证消息有序性:关于分布式事务的基本理论(2PC、3PC等)可以参考链接1.解决方案:主要有seata、基于MQ的最终一致性方案(包括本地消息表、事务消息以及最大努力通知)1.1.... 查看详情
相较于rocketmq的事务消息,本地消息表才是真正的王者(代码片段)
1.概览在分布式系统中,系统间的通信除了大家所熟知的RPC外,基于MQ的异步通信也越来越流行,已经成为基础设施的重要组成部分。而MQ的引入对系统间的数据一致性提出了新的挑战,逐渐成为系统稳定性的一大... 查看详情
rocketmq单机环境搭建(代码片段)
...,带来一篇搭建RocketMQ单机环境的文章,为后面的分布式事务专栏做准备。RocketMQ是阿里巴巴开源的一款高性能分布式消息中间件,有关RocketMQ的详细讲解,后面会单独开设一个RocketMQ专栏。这里,先简单介绍一... 查看详情
分布式事务解决方案
RocketMQ分布式事务https://www.cnblogs.com/linjiqin/p/9561641.html分布式事务解决方案https://www.cnblogs.com/taiyonghai/p/6094350.html 查看详情
spring解决rocketmq发消息与mysql事务一致性(代码片段)
...)Stringstatus="paid";orderDao.updateStatus(uid,orderNo,status);rocketMQTemplate.send("order:status",message(uid,orderNo,order.itemId,status));publicclassOrderStatusArticleListenerimplementsRocketMQListenerpublicvoidonMessage(message)Orderorder=orderService.selectOrder(message... 查看详情
rocketmq详解系列(代码片段)
什么是RocketMQRocketMQ作为一款纯java、分布式、队列模型的开源消息中间件,支持事务消息、顺序消息、批量消息、定时消息、消息回溯等。主要功能是异步解耦和流量削峰:。常见的MQ主要有:ActiveMQ、RabbitMQ、Kafka、Ro... 查看详情