rocketmq实现事务消息方案(代码片段)

liudalei liudalei     2023-04-11     374

关键词:

RocketMQ 是一个来自阿里巴巴的分布式消息中间件,于 2012 年开源,并在 2017 年正式成为 Apache 顶级项目。据了解,包括阿里云上的消息产品以及收购的子公司在内,阿里集团的消息产品全线都运行在 RocketMQ 之上,并且最近几年的双十一大促中,RocketMQ 都有抢眼表现。Apache RocketMQ 4.3之后的版本正式支持事务消息,为分布式事务实现提供了便利性支持。

RocketMQ 事务消息设计则主要是为了解决 Producer 端的消息发送与本地事务执行的原子性问题,RocketMQ 的设计中 broker 与 producer 端的双向通信能力,使得 broker 天生可以作为一个事务协调者存在;而 RocketMQ 本身提供的存储机制为事务消息提供了持久化能力;RocketMQ 的高可用机制以及可靠消息设计则为事务消息在系统发生异常时依然能够保证达成事务的最终一致性。

在RocketMQ 4.3后实现了完整的事务消息,实际上其实是对本地消息表的一个封装,将本地消息表移动到了MQ内部,解决 Producer 端的消息发送与本地事务执行的原子性问题。

 

技术图片

 

执行流程如下:

为方便理解我们还以注册送积分的例子来描述 整个流程。

Producer 即MQ发送方,本例中是用户服务,负责新增用户。MQ订阅方即消息消费方,本例中是积分服务,负责新增积分。

1、Producer 发送事务消息

Producer (MQ发送方)发送事务消息至MQ Server,MQ Server将消息状态标记为Prepared(预备状态),注意此时这条消息消费者(MQ订阅方)是无法消费到的。

本例中,Producer 发送 ”增加积分消息“ 到MQ Server。

2、MQ Server回应消息发送成功

MQ Server接收到Producer 发送给的消息则回应发送成功表示MQ已接收到消息。

3、Producer 执行本地事务

Producer 端执行业务代码逻辑,通过本地数据库事务控制。

本例中,Producer 执行添加用户操作。

4、消息投递

若Producer 本地事务执行成功则自动向MQServer发送commit消息,MQ Server接收到commit消息后将”增加积分消息“ 状态标记为可消费,此时MQ订阅方(积分服务)即正常消费消息;

若Producer 本地事务执行失败则自动向MQServer发送rollback消息,MQ Server接收到rollback消息后 将删除”增加积分消息“ 。

MQ订阅方(积分服务)消费消息,消费成功则向MQ回应ack,否则将重复接收消息。这里ack默认自动回应,即程序执行正常则自动回应ack。

5、事务回查

如果执行Producer端本地事务过程中,执行端挂掉,或者超时,MQ Server将会不停的询问同组的其他 Producer来获取事务执行状态,这个过程叫事务回查。MQ Server会根据事务回查结果来决定是否投递消息。

以上主干流程已由RocketMQ实现,对用户侧来说,用户需要分别实现本地事务执行以及本地事务回查方法,因此只需关注本地事务的执行状态即可。

RoacketMQ提供RocketMQLocalTransactionListener接口:

public interface RocketMQLocalTransactionListener 
/**
- 发送prepare消息成功此方法被回调,该方法用于执行本地事务
- @param msg 回传的消息,利用transactionId即可获取到该消息的唯一Id
- @param arg 调用send方法时传递的参数,当send时候若有额外的参数可以传递到send方法中,这里能获取到
- @return 返回事务状态,COMMIT:提交 ROLLBACK:回滚 UNKNOW:回调
*/
RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg);
/**
- @param msg 通过获取transactionId来判断这条消息的本地事务执行状态
- @return 返回事务状态,COMMIT:提交 ROLLBACK:回滚 UNKNOW:回调
*/
RocketMQLocalTransactionState checkLocalTransaction(Message msg);

发送事务消息:
以下是RocketMQ提供用于发送事务消息的API:

TransactionMQProducer producer = new TransactionMQProducer("ProducerGroup");
    producer.setNamesrvAddr("127.0.0.1:9876");
    producer.start();
    //设置TransactionListener实现
    producer.setTransactionListener(transactionListener);
    //发送事务消息
    SendResult sendResult = producer.sendMessageInTransaction(msg, null);

下面给出代码示例

package com.example.rocketmq.common;

import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;

@Slf4j
@Service
public class SyncProducer 
 
    @Resource
    private RocketMQTemplate rocketMQTemplate;
 
    public TransactionSendResult sendSyncMessage(String msg, String topic, String tag)

        log.info("【发送消息】:", msg);

        //构建消息体
        JSONObject jsonObject = new JSONObject();

        jsonObject.put("message",msg);

        Message<String> message = MessageBuilder.withPayload(jsonObject.toJSONString()).build();

        TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction("tx_group", topic, message, tag);

        log.info("【发送状态】:", result.getLocalTransactionState());

        return result;

    
 
package com.example.rocketmq.common;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

import java.util.concurrent.ConcurrentHashMap;

@Slf4j
@Component
@RocketMQTransactionListener(txProducerGroup = "tx_group")
public class SyncProducerListener implements RocketMQLocalTransactionListener 

    private ConcurrentHashMap<String, Object> localTrans = new ConcurrentHashMap<>();

    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) 

        try 

            log.info("【本地业务执行完毕】 msg:, Object:", message, o);

            localTrans.put(message.getHeaders().getId()+"", message.getPayload());

            return RocketMQLocalTransactionState.COMMIT;

         catch (Exception e) 

            e.printStackTrace();

            log.error("【执行本地业务异常】 exception message:", e.getMessage());

            return RocketMQLocalTransactionState.ROLLBACK;

        

    
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) 

        log.info("【执行检查任务】");

        return RocketMQLocalTransactionState.UNKNOWN;

    
package com.example.rocketmq.common;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

@Component
@RocketMQMessageListener(topic = "TransactionTopic",consumerGroup = "spring_boot_consumer_group")
@Slf4j
public class TxmsgConsumer implements RocketMQListener<String> 

    @Override
    public void onMessage(String s) 

        log.info("开始消费消息:",s);

    

application.propertis

rocketmq.name-server=localhost:9876
rocketmq.producer.group = spring_boot_producer_group

pom.xml

       <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.0.3</version>
        </dependency>

测试:

package com.example.rocketmq.controller;


import com.example.rocketmq.common.SyncProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class TestController 


    @Autowired
    private SyncProducer syncProducer;

    @GetMapping("test")
    String test() 

        syncProducer.sendSyncMessage("我随便传的一条测试消息,内容保密","TransactionTopic","TransactionTAG");

        return "ok";
    


结果:

 技术图片

 

 

技术图片

 

rocketmq事务消息原理(代码片段)

一、RocketMQ事务消息原理:        RocketMQ在4.3版本之后实现了完整的事务消息,基于MQ的分布式事务方案,本质上是对本地消息表的一个封装,整体流程与本地消息表一致,唯一不同的就是将本地消息表存... 查看详情

rocketmq的分布式事务机制(事务消息)(代码片段)

详细介绍了RocketMQ的事务消息机制,RocketMQ的事务消息可以用于实现基于可靠消息的最终一致性的分布式事务。文章目录1事务消息简要流程2一阶段半消息不可见的设计3二阶段Commit和Rollback操作4Op消息的设计5Commit消息变得可见... 查看详情

30rocketmq事务消息的代码实现细节(代码片段)

...。1.发送half事务消息出去packagecom.mqTrsMessage;importorg.apache.rocketmq.client.producer.DefaultMQProducer;importorg.apache.rocketmq.clien 查看详情

11springboot整合rocketmq实现事务消息(代码片段)

事务消息是RocketMQ提供的非常重要的一个特性,在4.x版本之后开源,可以利用事务消息轻松地实现分布式事务。RocketMQ在其消息定义的基础上,对事务消息扩展了两个相关的概念:Half(Prepare)Message——半消息(预处理... 查看详情

rocketmq事务消息入门介绍(代码片段)

说明周五的时候发了篇:Rocketmq4.3支持事务啦!!!,趁着周末的时候把相关内容看了下,下面的主要内容就是关于RocketMQ事务相关内容介绍了。说明:今天这篇仅仅是入门介绍,并没有涉及到很多细节,先把大概流程说明白,... 查看详情

rocketmq(09)——发送事务消息(代码片段)

发送事务消息RocketMQ支持发送事务消息,它的事务消息是基于二阶段提交机制实现的。当发送的消息是事务消息时,只有对应的消息被提交了才能被消费者进行消费。发送事务消息时生产者需要使用TransactionMQProducer,它还需要指... 查看详情

rocketmq源码分析之从官方示例窥探:rocketmq事务消息实现基本思想(代码片段)

RocketMQ4.3.0版本开始支持事务消息,后续分享将开始将剖析事务消息的实现原理。首先从官方给出的Demo实例入手,以此通往RocketMQ事务消息的世界中。官方版本未发布之前,从apacherocketmq第一个版本上线后,代码中存在与事务消息... 查看详情

rocketmq源码分析之rocketmq事务消息实现原理中篇----事务消息状态回查(代码片段)

上节已经梳理了RocketMQ发送事务消息的流程(基于二阶段提交),本节将继续深入学习事务状态消息回查,我们知道,第一次提交到消息服务器时消息的主题被替换为RMQ_SYS_TRANS_HALF_TOPIC,本地事务执行完后如果返回本地事务状态... 查看详情

rocketmq源码分析之rocketmq事务消息实现原理中篇----事务消息状态回查(代码片段)

上节已经梳理了RocketMQ发送事务消息的流程(基于二阶段提交),本节将继续深入学习事务状态消息回查,我们知道,第一次提交到消息服务器时消息的主题被替换为RMQ_SYS_TRANS_HALF_TOPIC,本地事务执行完后如果返回本地事务状态... 查看详情

rocketmq(09)——发送事务消息(代码片段)

发送事务消息RocketMQ支持发送事务消息,它的事务消息是基于二阶段提交机制实现的。当发送的消息是事务消息时,只有对应的消息被提交了才能被消费者进行消费。发送事务消息时生产者需要使用TransactionMQProducer,... 查看详情

rocketmq事务消息实战(代码片段)

我们以一个订单流转流程来举例,例如订单子系统创建订单,需要将订单数据下发到其他子系统(与第三方系统对接)这个场景,我们通常会将两个系统进行解耦,不直接使用服务调用的方式进行交互。其业务实现步骤通常为:... 查看详情

rocketmq事务消息机制(代码片段)

RocketMQ提供了事务消息,通过事务消息就能达到分布式事务的最终一致,从而实现了可靠消息服务。一、事务消息的实现步骤事务消息发送步骤:1.发送方将半事务消息发送至RocketMQ服务端。2.RocketMQ服务端将消息持久... 查看详情

27发送消息零丢失方案:rocketmq事务消息的实现流程分析

...常见的网络故障之类的问题,导致消息就丢失了。在RocketMQ中,有这么一个功能,就是事务消息的功能,凭借这个事务级的消息机制,就可以让我们确保订单系统推送出去的消息一定会成功写入MQ里 查看详情

七.rocketmq极简入门-rocketmq事务消息(代码片段)

概述如果业务只涉及到一个数据库的写操作,我们只需要保证这一个事物的提交和回滚,这种事务管理叫传统事物或本地事务,如果业务涉及到多个数据库(多个服务)的写操作,我们需要保证多个数据库同时提交... 查看详情

rocketmq事务消息篇之事务消息的使用(代码片段)

前言在RocketMQ事务消息篇(一)之事务消息的介绍里对RocketMQ的事务消息作了相关说明,本文提供一些基本的开发示例。java示例依赖<dependencies><dependency><groupId>org.apache.rocketmq</groupId><artifactId>ro 查看详情

rocketmq事务消息篇之事务消息源码分析(代码片段)

前言RocketMQ事务消息篇(一)之事务消息的介绍RocketMQ事务消息篇(二)之事务消息的使用本文继前两篇对事务消息源码进行分析。事务消息处理基本流程在介绍事务消息的时候,画了一个简单的流程图说明事... 查看详情

rocketmq使用事务消息(代码片段)

...ate.ROLLBACK_MESSAGE、LocalTransactionState.UNKNOW。原理事务消息是RocketMQ的一大特性,其保证发送消息和执行本地逻辑在同一个事务内。实现的思路借鉴了两阶段提交协议:第一阶段:发送半事务消息,消息发送后,消... 查看详情

浅谈rocketmq如何保证消息不丢失(代码片段)

RocketMQ如何保证消息不丢失?一、明确丢失消息场景二、RocketMQ避免消息丢失解决方案1、消息生产者使用事务消息2、配置同步刷盘+Dledger主从架构3、消息消费者端使用同步消费机制4、设计降级方案MQ如何避免消息不丢失&#x... 查看详情