rocketmq使用事务消息(代码片段)

乐观男孩 乐观男孩     2023-03-01     514

关键词:

说明

事务消息:
1、不支持延时消息和批量消息
2、如果消息没有及时提交,默认check 15次,可以通过Broker的transactionCheckMax参数配置次数。
如果超时15次依然没有得到明确结果,将会打印异常信息,具体的处理策略可以通过复写AbstractTransactionCheckListener类实现
3、每次check的时间间隔可以通过Broker的transactionTimeout配置,也可以在消息中增加CHECK_IMMUNITY_TIME_IN_SECONDS属性指定
4、事务状态:LocalTransactionState.COMMIT_MESSAGE、LocalTransactionState.ROLLBACK_MESSAGE、LocalTransactionState.UNKNOW。

原理

事务消息是RocketMQ的一大特性,其保证发送消息和执行本地逻辑在同一个事务内。实现的思路借鉴了两阶段提交协议:
第一阶段:发送半事务消息,消息发送后,消息是对消费者透明的,也就是该消息还不属于可消费消息,消费者无法消费。
第二阶段:执行本地事务,本地执行事务后提交消息。
(1)、如果事务执行失败,则回滚消息;
(2)、如果事务执行成功,则提交消息,提交后消费者可消费到消息;
(3)、如果事务执行成功,但消息提交失败,RocketMQ还提供了回查机制:如果一段时间过后,没有提交/回滚半事务消息,RocketMQ会定时回查一定的次数,获取本地事务的状态以决定是提交还是回滚消息。如果回查一定的次数后依然没有获取到本地事务的明确状态,则消息会被放到死信队列,由人工确认如何处理。

事务消息处理流程


1、生产端发送半事务消息到服务端
2、服务端返回半事务消息发送成功响应。注意,此时的消息对消费端是不可见的,不可被消费
3、发送方执行本地事务
4、执行完本地事务后,客户端同步服务端提交/回滚消息
5、如果服务端在一定的时间内,等不到4的回应,则定时进行回查,询问客户端的本地事务状态。
6、客户端检查本地事务状态
7、根据本地事务执行情况,告知服务端,服务端决定是提交消息还是丢弃消息。

生产端

@Test
    public void sendMessage() throws Exception 
        //事务生产者
        TransactionMQProducer producer = new TransactionMQProducer("defaultGroup");
        producer.setNamesrvAddr(SpringUtil.getBean(RocketMqConfig.class).getNamesrvAddr());
        //设置检查本地事务状态的线程池
        //producer.setExecutorService(null);
        //本地事务执行监听器
        TransactionListener transactionListener = new TransactionListenerImpl();
        producer.setTransactionListener(transactionListener);
        producer.start();
        Message message = new Message(RocketMqUtil.TOPIC, "transaction", "transaction-message".getBytes(Charset.forName("UTF-8")));
        //发送事务消息
        producer.sendMessageInTransaction(message, null);
    

    class TransactionListenerImpl implements TransactionListener 

        @Override
        public LocalTransactionState executeLocalTransaction(Message msg, Object arg) 
            //执行本地事务(数据库)操作......
            int num = new Random().nextInt(10);
            if (num < 3) 
                //本地事务执行成功,提交消息
                return LocalTransactionState.COMMIT_MESSAGE;
             else if (num < 6) 
                //本地事务执行失败,删除消息
                return LocalTransactionState.ROLLBACK_MESSAGE;
            
            //等待本地事务check,即执行checkLocalTransaction()方法
            return LocalTransactionState.UNKNOW;
        

        /**
         * 回查逻辑
         * @param msg
         * @return
         */
        @Override
        public LocalTransactionState checkLocalTransaction(MessageExt msg) 
            int num = new Random().nextInt(10);
            if (num < 3) 
                //提交消息
                return LocalTransactionState.COMMIT_MESSAGE;
             else if (num < 6) 
                //删除消息
                return LocalTransactionState.ROLLBACK_MESSAGE;
            
            return LocalTransactionState.UNKNOW;
        
    

发送事务消息步骤:
1、初始化TransactionMQProducer实例
2、指定check线程池(回查线程池)
3、为Producer添加自定义事务监听器。自定义事务监听器需实现TransactionListener接口,通过覆盖接口的executeLocalTransaction方法执行本地事务,返回事务状态,客户端会根据本地事务状态通知服务端,决定是否提交消息;通过覆盖接口的checkLocalTransaction方法提供回查机制,当在一定的时间内服务端获取不到本地事务执行状态,将通过该方法回查事务状态,以决定消失是否需要提交。
4、通过Producer.sendMessageInTransaction发送事务消息。
消费者正常消费逻辑

消费端

@Test
    public void consumeMessage() throws Exception 
        DefaultMQPushConsumer defaultMQPushConsumer = RocketMqUtil.getDefaultMQPushConsumer();
        defaultMQPushConsumer.subscribe(RocketMqUtil.TOPIC, "*");
        defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() 
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list,
                                                            ConsumeConcurrentlyContext consumeConcurrentlyContext) 
                log.info("消费到消息条数:", list.size());
                list.stream().map(messageExt -> new String(messageExt.getBody(), Charset.forName("UTF-8")))
                        .map(String::new).forEach(System.out::println);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            
        );
        defaultMQPushConsumer.start();
        Thread.sleep(5000L);
    

消费端正常消费消息即可。

rocketmq(09)——发送事务消息(代码片段)

发送事务消息RocketMQ支持发送事务消息,它的事务消息是基于二阶段提交机制实现的。当发送的消息是事务消息时,只有对应的消息被提交了才能被消费者进行消费。发送事务消息时生产者需要使用TransactionMQProducer,它还需要指... 查看详情

rocketmq使用事务消息(代码片段)

目录说明原理事务消息处理流程生产端消费端说明事务消息:1、不支持延时消息和批量消息2、如果消息没有及时提交,默认check15次,可以通过Broker的transactionCheckMax参数配置次数。如果超时15次依然没有得到明确结果... 查看详情

rocketmq(09)——发送事务消息(代码片段)

发送事务消息RocketMQ支持发送事务消息,它的事务消息是基于二阶段提交机制实现的。当发送的消息是事务消息时,只有对应的消息被提交了才能被消费者进行消费。发送事务消息时生产者需要使用TransactionMQProducer,... 查看详情

rocketmq事务消息篇之事务消息源码分析(代码片段)

前言RocketMQ事务消息篇(一)之事务消息的介绍RocketMQ事务消息篇(二)之事务消息的使用本文继前两篇对事务消息源码进行分析。事务消息处理基本流程在介绍事务消息的时候,画了一个简单的流程图说明事... 查看详情

30rocketmq事务消息的代码实现细节(代码片段)

...。1.发送half事务消息出去packagecom.mqTrsMessage;importorg.apache.rocketmq.client.producer.DefaultMQProducer;importorg.apache.rocketmq.clien 查看详情

rocketmq事务消息实战(代码片段)

RocketMQ事务消息阅读目录指引:RocketMQ源码分析之从官方示例窥探RocketMQ事务消息实现基本思想RocketMQ源码分析之RocketMQ事务消息实现原理上篇RocketMQ源码分析之RocketMQ事务消息实现原理中篇----事务消息状态回查RocketMQ源码分析... 查看详情

rocketmq源码分析之从官方示例窥探:rocketmq事务消息实现基本思想(代码片段)

RocketMQ4.3.0版本开始支持事务消息,后续分享将开始将剖析事务消息的实现原理。首先从官方给出的Demo实例入手,以此通往RocketMQ事务消息的世界中。官方版本未发布之前,从apacherocketmq第一个版本上线后,代码中存在与事务消息... 查看详情

rocketmq事务消息原理(代码片段)

一、RocketMQ事务消息原理:        RocketMQ在4.3版本之后实现了完整的事务消息,基于MQ的分布式事务方案,本质上是对本地消息表的一个封装,整体流程与本地消息表一致,唯一不同的就是将本地消息表存... 查看详情

rocketmq源码分析之rocketmq事务消息实现原理中篇----事务消息状态回查(代码片段)

上节已经梳理了RocketMQ发送事务消息的流程(基于二阶段提交),本节将继续深入学习事务状态消息回查,我们知道,第一次提交到消息服务器时消息的主题被替换为RMQ_SYS_TRANS_HALF_TOPIC,本地事务执行完后如果返回本地事务状态... 查看详情

rocketmq源码分析之rocketmq事务消息实现原理中篇----事务消息状态回查(代码片段)

上节已经梳理了RocketMQ发送事务消息的流程(基于二阶段提交),本节将继续深入学习事务状态消息回查,我们知道,第一次提交到消息服务器时消息的主题被替换为RMQ_SYS_TRANS_HALF_TOPIC,本地事务执行完后如果返回本地事务状态... 查看详情

rocketmq使用事务消息(代码片段)

...ate.ROLLBACK_MESSAGE、LocalTransactionState.UNKNOW。原理事务消息是RocketMQ的一大特性,其保证发送消息和执行本地逻辑在同一个事务内。实现的思路借鉴了两阶段提交协议:第一阶段:发送半事务消息,消息发送后,消... 查看详情

rocketmq的分布式事务机制(事务消息)(代码片段)

详细介绍了RocketMQ的事务消息机制,RocketMQ的事务消息可以用于实现基于可靠消息的最终一致性的分布式事务。文章目录1事务消息简要流程2一阶段半消息不可见的设计3二阶段Commit和Rollback操作4Op消息的设计5Commit消息变得可见... 查看详情

rocketmq事务消息原理及使用方法解析(代码片段)

...#x1f34a;Java学习:Java从入门到精通总结🍊深入浅出RocketMQ设计思想:深入浅出RocketMQ设计思想🍊绝对不一样的职场干货:大厂最佳实践经验指南📆最近更新:2023年3月24日🍊个人简介:通信工程... 查看详情

11springboot整合rocketmq实现事务消息(代码片段)

事务消息是RocketMQ提供的非常重要的一个特性,在4.x版本之后开源,可以利用事务消息轻松地实现分布式事务。RocketMQ在其消息定义的基础上,对事务消息扩展了两个相关的概念:Half(Prepare)Message——半消息(预处理... 查看详情

rocketmq(十六)事务消息(代码片段)

1、问题引入这里的一个需求场景是:工行用户A向建行用户B转账1万元。我们可以使用同步消息来处理该需求场景:工行系统发送一个给B增款1万元的同步消息M给Broker消息被Broker成功接收后,向工行系统发送成功ACK工... 查看详情

rocketmq实现事务消息方案(代码片段)

RocketMQ是一个来自阿里巴巴的分布式消息中间件,于2012年开源,并在2017年正式成为Apache顶级项目。据了解,包括阿里云上的消息产品以及收购的子公司在内,阿里集团的消息产品全线都运行在RocketMQ之上,并且最近几年的双十一... 查看详情

rocketmq使用(代码片段)

rocketmq 基本使用可以看官网和官网给的demo.https://github.com/apache/rocketmq/tree/master/example这里主要说明几个点:rocketmq  发送类型常用:1,普通消息.(可以获取发送结果,失败了重试)2,有序消息.(秒杀,等需要有序的消费场景)3,事... 查看详情

七.rocketmq极简入门-rocketmq事务消息(代码片段)

概述如果业务只涉及到一个数据库的写操作,我们只需要保证这一个事物的提交和回滚,这种事务管理叫传统事物或本地事务,如果业务涉及到多个数据库(多个服务)的写操作,我们需要保证多个数据库同时提交... 查看详情