rocketmq事务消息篇之事务消息的使用(代码片段)

NetWhite NetWhite     2022-12-12     763

关键词:

前言

RocketMQ事务消息篇(一)之事务消息的介绍里对RocketMQ的事务消息作了相关说明,本文提供一些基本的开发示例。

java示例

依赖

    <dependencies>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>$rocketmq.version</version>
        </dependency>

        <!--        用到acl需要添加这个依赖,不使用,忽略这个依赖-->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-acl</artifactId>
            <version>$rocketmq.version</version>
        </dependency>
    </dependencies>

示例代码 

@Slf4j
public class TransactionProducer 

    // name server地址
    private static final String NAME_SRV = "127.0.0.1:9876";
    // topic名称
    private static final String TOPIC = "test_topic";
    // 生产者组名
    private static final String PRODUCER_GROUP = "producer_transaction_group_demo";
    // access key of ACL
    private static final String ACCESS_KEY = "xuxiaodong";
    // secret key of ACL
    private static final String SECRET_KEY = "12345678";

    private static final Map<String, Boolean> resultCache = new HashMap<>();

    public static void main(String[] args) throws Exception 
//        TransactionMQProducer producer = new TransactionMQProducer(PRODUCER_GROUP);
        // 支持acl事务生产者
        TransactionMQProducer producer = new TransactionMQProducer(PRODUCER_GROUP, new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY)));
        producer.setNamesrvAddr(NAME_SRV);
        // 设置事务监听器
        producer.setTransactionListener(new TransactionListener() 
            @Override
            public LocalTransactionState executeLocalTransaction(Message msg, Object arg) 
                // 1.执行本地事务逻辑
                log.info("execute local transaction, msgId: ", msg.getTransactionId());
                log.info("this is params: ", arg);
                return LocalTransactionState.UNKNOW;
            

            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt msg) 
                // 2.事务回查
                String id = msg.getTransactionId();
                log.info("check local transaction state, msgId: ", id);
                // 事务执行失败了,回滚消息
                if (resultCache.containsKey(id) && !resultCache.get(id)) 
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                
                // 事务执行成功
                resultCache.put(id, true);
                return LocalTransactionState.COMMIT_MESSAGE;
            
        );
        producer.start();
        Message message = new Message();
        message.setTopic(TOPIC);
        message.setBody("This is a transaction message".getBytes());
        SendResult result = producer.sendMessageInTransaction(message, "params");
        log.info(result.toString());
    

TransactionListener说明

TransactionListener的两个接口是rocketmq二阶段执行本地事务及事务回查的入口。其返回值LocalTransactionState如下:

  • COMMIT_MESSAGE:提交,本地事务执行成功,返回状态
  • ROLLBACK_MESSAGE:回滚,本地事务执行失败,返回状态
  • UNKNOW:未知,其它情况,返回该状态,会进行事务回查

executeLocalTransaction()方法不同返回值场景说明:

  • COMMIT_MESSAGE:事务提交,消费方可以消费消息,不会执行checkLocalTransaction方法
  • ROLLBACK_MESSAGE:事务回滚,消费方不会消费这条消息,不会执行checkLocalTransaction方法
  • UNKNOW:开始事务回查,执行checkLocalTransaction方法

checkLocalTransaction()方法不同返回值场景说明:

  • COMMIT_MESSAGE:事务提交,消费方可以消费消息,不会再执行checkLocalTransaction方法
  • ROLLBACK_MESSAGE:事务回滚,消费方不会消费这条消息,不会再执行checkLocalTransaction方法
  • UNKNOW:事务回查,继续执行checkLocalTransaction方法

go示例

go mod

require (
    github.com/apache/rocketmq-client-go/v2 v2.1.0-rc3
)

示例代码

package main

import (
	"context"
	"fmt"
	"github.com/apache/rocketmq-client-go/v2"
	"github.com/apache/rocketmq-client-go/v2/primitive"
	"github.com/apache/rocketmq-client-go/v2/producer"
	"os"
	"strconv"
	"sync"
	"time"
)

type TransactionListener struct 
	transCache *sync.Map


func (listener *TransactionListener) ExecuteLocalTransaction(msg *primitive.Message) primitive.LocalTransactionState 
	listener.transCache.Store(msg.TransactionId, true)

	fmt.Printf("执行本地事务")
	return primitive.UnknowState


func (listener *TransactionListener) CheckLocalTransaction(msg *primitive.MessageExt) primitive.LocalTransactionState 
	fmt.Printf("事务回查, msgID : %v\\n", msg.MsgId)
	execSuccess, existed := listener.transCache.LoadAndDelete(msg.TransactionId)
	if !existed 
		fmt.Printf("unknow msg: %v", msg)
		return primitive.UnknowState
		//return primitive.CommitMessageState
	
	if execSuccess.(bool) 
		fmt.Println("本地事务执行成功")
		return primitive.CommitMessageState
	 else 
		fmt.Println("本地事务执行失败")
		return primitive.RollbackMessageState
	


func main() 

	// 设置日志,默认使用logrus
	//rlog.SetLogger()
	nameSrv := []string"10.100.101.20:9876", "10.100.108.208:9876", "10.100.111.68:9876"

	p, _ := rocketmq.NewTransactionProducer(
		&TransactionListenertransCache: new(sync.Map),
		producer.WithNameServer(nameSrv), //设置name server地址,<=2.0.0版本,不支持域名
		//分别设置框架组提供的accessKey和secretKey
		producer.WithCredentials(primitive.CredentialsAccessKey: "rmq_access_key", SecretKey: "rmq_secret_key"),
		producer.WithTrace(&primitive.TraceConfigNamesrvAddrs: nameSrv), //启用消息轨迹,把该行代码注释掉,则不启用
		producer.WithGroupName("test_topic_producer"),                     //设置生产组名
	)

	// 启动producer
	err := p.Start()
	if err != nil 
		fmt.Printf("start producer error: %s", err.Error())
		os.Exit(1)
	
	// 换成自己业务使用的topic
	topic := "test_topic"

	for i := 0; i < 1; i++ 
		msg := &primitive.Message
			Topic: topic,
			Body:  []byte("Hello RocketMQ Go Client! " + strconv.Itoa(i)), //消息体
		
		// 设置tag,如果有需要再设置
		//msg.WithTag("tagA | tagB")
		// 设置消息的keys,建议设置成业务流水ID等字段可以用来标志业务唯一性的,消息的keys可以检索消息
		msg.WithKeys([]string"key" + strconv.Itoa(i))
		res, err := p.SendMessageInTransaction(context.Background(), msg)

		if err != nil 
			fmt.Printf("send message error: %s\\n", err)
		 else 
			fmt.Printf("send message success: result=%s\\n", res.String())
		
	
	time.Sleep(time.Minute * 10)
	if err = p.Shutdown(); err != nil 
		fmt.Printf("producer shutdown error: %s\\n", err)
	

关于代码中事务的几个返回字段和上面的java等同。

注意事项

  1. 同一类事务消息及相关topic,使用相同的生产组名称,进行事务回查的时候,broker端会根据生产者组名称查询相关生产者实例,如果所有的生产者都混用同一个生产组名称,那就查到其它实例上了
  2. 生产组保证最少有2个生产者实例,万一其中一个宕机,重启等broker回查还能找到另一个生产者实例,保证可用性
  3. 本地事务执行状态需要自己维护,可以考虑使用第三方存储介质,如:mysql
  4. 事务回查达到消息最大次数(默认15次,每次1分钟)便会丢弃该事务消息。用户可以通过 Broker 配置文件的 `transactionCheckMax`参数来修改此限制。

rocketmq(09)——发送事务消息(代码片段)

发送事务消息RocketMQ支持发送事务消息,它的事务消息是基于二阶段提交机制实现的。当发送的消息是事务消息时,只有对应的消息被提交了才能被消费者进行消费。发送事务消息时生产者需要使用TransactionMQProducer,它还需要指... 查看详情

rocketmq事务消息入门介绍(代码片段)

说明周五的时候发了篇:Rocketmq4.3支持事务啦!!!,趁着周末的时候把相关内容看了下,下面的主要内容就是关于RocketMQ事务相关内容介绍了。说明:今天这篇仅仅是入门介绍,并没有涉及到很多细节,先把大概流程说明白,... 查看详情

rocketmq(09)——发送事务消息(代码片段)

发送事务消息RocketMQ支持发送事务消息,它的事务消息是基于二阶段提交机制实现的。当发送的消息是事务消息时,只有对应的消息被提交了才能被消费者进行消费。发送事务消息时生产者需要使用TransactionMQProducer,... 查看详情

rocketmq使用事务消息(代码片段)

目录说明原理事务消息处理流程生产端消费端说明事务消息:1、不支持延时消息和批量消息2、如果消息没有及时提交,默认check15次,可以通过Broker的transactionCheckMax参数配置次数。如果超时15次依然没有得到明确结果... 查看详情

30rocketmq事务消息的代码实现细节(代码片段)

...。1.发送half事务消息出去packagecom.mqTrsMessage;importorg.apache.rocketmq.client.producer.DefaultMQProducer;importorg.apache.rocketmq.clien 查看详情

rocketmq事务消息实战(代码片段)

RocketMQ事务消息阅读目录指引:RocketMQ源码分析之从官方示例窥探RocketMQ事务消息实现基本思想RocketMQ源码分析之RocketMQ事务消息实现原理上篇RocketMQ源码分析之RocketMQ事务消息实现原理中篇----事务消息状态回查RocketMQ源码分析... 查看详情

rocketmq的分布式事务机制(事务消息)(代码片段)

详细介绍了RocketMQ的事务消息机制,RocketMQ的事务消息可以用于实现基于可靠消息的最终一致性的分布式事务。文章目录1事务消息简要流程2一阶段半消息不可见的设计3二阶段Commit和Rollback操作4Op消息的设计5Commit消息变得可见... 查看详情

rocketmq源码分析之从官方示例窥探:rocketmq事务消息实现基本思想(代码片段)

RocketMQ4.3.0版本开始支持事务消息,后续分享将开始将剖析事务消息的实现原理。首先从官方给出的Demo实例入手,以此通往RocketMQ事务消息的世界中。官方版本未发布之前,从apacherocketmq第一个版本上线后,代码中存在与事务消息... 查看详情

rocketmq源码分析之rocketmq事务消息实现原理中篇----事务消息状态回查(代码片段)

上节已经梳理了RocketMQ发送事务消息的流程(基于二阶段提交),本节将继续深入学习事务状态消息回查,我们知道,第一次提交到消息服务器时消息的主题被替换为RMQ_SYS_TRANS_HALF_TOPIC,本地事务执行完后如果返回本地事务状态... 查看详情

rocketmq源码分析之rocketmq事务消息实现原理中篇----事务消息状态回查(代码片段)

上节已经梳理了RocketMQ发送事务消息的流程(基于二阶段提交),本节将继续深入学习事务状态消息回查,我们知道,第一次提交到消息服务器时消息的主题被替换为RMQ_SYS_TRANS_HALF_TOPIC,本地事务执行完后如果返回本地事务状态... 查看详情

搞懂分布式技术19:使用rocketmq事务消息解决分布式事务

搞懂分布式技术19:使用RocketMQ事务消息解决分布式事务初步认识RocketMQ的核心模块rocketmq模块rocketmq-broker:接受生产者发来的消息并存储(通过调用rocketmq-store),消费者从这里取得消息。rocketmq-client:提供发送、接受消息的客... 查看详情

rocketmq事务消息原理(代码片段)

一、RocketMQ事务消息原理:        RocketMQ在4.3版本之后实现了完整的事务消息,基于MQ的分布式事务方案,本质上是对本地消息表的一个封装,整体流程与本地消息表一致,唯一不同的就是将本地消息表存... 查看详情

rocketmq实现事务消息方案(代码片段)

RocketMQ是一个来自阿里巴巴的分布式消息中间件,于2012年开源,并在2017年正式成为Apache顶级项目。据了解,包括阿里云上的消息产品以及收购的子公司在内,阿里集团的消息产品全线都运行在RocketMQ之上,并且最近几年的双十一... 查看详情

rocketmq使用事务消息(代码片段)

...ate.ROLLBACK_MESSAGE、LocalTransactionState.UNKNOW。原理事务消息是RocketMQ的一大特性,其保证发送消息和执行本地逻辑在同一个事务内。实现的思路借鉴了两阶段提交协议:第一阶段:发送半事务消息,消息发送后,消... 查看详情

11springboot整合rocketmq实现事务消息(代码片段)

事务消息是RocketMQ提供的非常重要的一个特性,在4.x版本之后开源,可以利用事务消息轻松地实现分布式事务。RocketMQ在其消息定义的基础上,对事务消息扩展了两个相关的概念:Half(Prepare)Message——半消息(预处理... 查看详情

rocketmq事务消息原理及使用方法解析(代码片段)

...#x1f34a;Java学习:Java从入门到精通总结🍊深入浅出RocketMQ设计思想:深入浅出RocketMQ设计思想🍊绝对不一样的职场干货:大厂最佳实践经验指南📆最近更新:2023年3月24日🍊个人简介:通信工程... 查看详情

rocketmq事务消息机制(代码片段)

RocketMQ提供了事务消息,通过事务消息就能达到分布式事务的最终一致,从而实现了可靠消息服务。一、事务消息的实现步骤事务消息发送步骤:1.发送方将半事务消息发送至RocketMQ服务端。2.RocketMQ服务端将消息持久... 查看详情

rocketmq事务消息

RocketMQ事务消息在实现上充分利用了RocketMQ本身机制,在实现零依赖的基础上,同样实现了高性能、可扩展、全异步等一系列特性。在具体实现上,RocketMQ通过使用HalfTopic以及OperationTopic两个内部队列来存储事务消息推进状态,如... 查看详情