rocketmq事务消息篇之事务消息源码分析(代码片段)

NetWhite NetWhite     2022-12-15     325

关键词:

前言

RocketMQ事务消息篇(一)之事务消息的介绍

RocketMQ事务消息篇(二)之事务消息的使用

本文继前两篇对事务消息源码进行分析。

事务消息处理基本流程

在介绍事务消息的时候,画了一个简单的流程图说明事务消息的整体处理流程:

p.s. 下面的序号(1、2、3...)表示顺序,与上图中的(1、2、3...)无关。

  1. 事务生产者调用事务消息发送接口,发送消息
  2. 开始预提交阶段,客户端发送预消息并在请求头标记这是一条事务消息。消息体就是我们实际要发送的消息内容
  3. broker接收到消息,发现这是一条事务消息,于是将当前消息备份。所谓“备份”即将当前消息的所有数据写入内部的事务topic中而不是我们实际要发送的topic,该事务topic由于消费端并没有订阅,所以这条消息对消费端不可见,然后响应客户端的发送请求
  4. 客户端确认发送成功,则执行本地事务,并标记事务执行状态。如果发送失败,就不需要执行本地事务了,直接标记事务执行失败,需要回滚。
  5. 基于事务的执行状态,给本次发送事务消息的那个broker发送一条结束事务的请求(请求头里包含是提交还是回滚,亦或者是未知状态)
  6. broker收到事务结束的请求,如果是未知状态就打条日志直接返回了;如果是提交事务,就将备份的那条事务消息恢复过来,写入到原始的topic里,此时就对消费端可见了,然后要在op队列里(另一个内部topic)写入一条消息,消息体就是当前这条事务消息的队列偏移值。如果是回滚事务,就只用在op队列里写入一条消息即可,就不还原事务消息了,这样对消费端就不可见。关于op队列的具体作用,后面源码部分再详说。
  7. 说一下事务回查。事务回查就是broker扫描到那些没有提交也没回滚的消息,找到客户端,发一个请求,让客户端再次提交一下事务结束状态。

源码剖析

整体流程涉及的代码还是比较多的,接下来对每一部分的源码拆开进行分析。

客户端处理,事务执行流程

客户端处理基本流程如下:

源码的主要入口实现部分在这个方法里:

org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendMessageInTransaction(final Message msg, final LocalTransactionExecuter localTransactionExecuter, final Object arg) throws MQClientException;

 代码如下,我已经加上相关注释:

    public TransactionSendResult sendMessageInTransaction(final Message msg,
        final LocalTransactionExecuter localTransactionExecuter, final Object arg)
        throws MQClientException 
        // 获取我们创建的事务消息监听器(本地事务执行及事务回查)
        TransactionListener transactionListener = getCheckListener();
        if (null == localTransactionExecuter && null == transactionListener) 
            throw new MQClientException("tranExecutor is null", null);
        

        // 事务消息不支持延时消息
        // ignore DelayTimeLevel parameter
        if (msg.getDelayTimeLevel() != 0) 
            MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
        

        Validators.checkMessage(msg, this.defaultMQProducer);

        SendResult sendResult = null;
        // 设置半消息属性(TRAN_MSG),标志这是一个事务半消息
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
        // 设置生产组属性(PGROUP)
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
        try 
            // 消息发送,这里就是平常同步发送普通消息的默认发送流程了,在客户端这里,就已经没有太多其它额外的针对事务消息的处理了,主要还是会判断是半消息的话,打个事务消息的标记
            sendResult = this.send(msg);
         catch (Exception e) 
            throw new MQClientException("send message Exception", e);
        

        LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
        Throwable localException = null;
        switch (sendResult.getSendStatus()) 
            case SEND_OK: 
                try 
                    // transactionId
                    if (sendResult.getTransactionId() != null) 
                        msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
                    
                    // UNIQ_KEY,这是客户端发送的时侯生成的一个唯一ID,也就是我们平常用的sendResult里的msgId
                    String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
                    if (null != transactionId && !"".equals(transactionId)) 
                        // 在这种情况下,transactionId其实就是message的客户端msgId
                        msg.setTransactionId(transactionId);
                    
                    // 一般,我平常使用的时候,不会采用localTransactionExecuter方式调用事务消息接口,所以这里一般是空
                    if (null != localTransactionExecuter) 
                        localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
                     else if (transactionListener != null) 
                        log.debug("Used new transaction API");
                        // 执行我们本地事务
                        localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
                    
                    if (null == localTransactionState) 
                        localTransactionState = LocalTransactionState.UNKNOW;
                    

                    if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) 
                        log.info("executeLocalTransactionBranch return ", localTransactionState);
                        log.info(msg.toString());
                    
                 catch (Throwable e) 
                    log.info("executeLocalTransactionBranch exception", e);
                    log.info(msg.toString());
                    localException = e;
                
            
            break;
            // 未发送成功,回滚消息
            case FLUSH_DISK_TIMEOUT:
            case FLUSH_SLAVE_TIMEOUT:
            case SLAVE_NOT_AVAILABLE:
                localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
                break;
            default:
                break;
        

        try 
            // 事务结束处理,是提交还是回滚还是要做其它操作
            this.endTransaction(msg, sendResult, localTransactionState, localException);
         catch (Exception e) 
            log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
        

        TransactionSendResult transactionSendResult = new TransactionSendResult();
        transactionSendResult.setSendStatus(sendResult.getSendStatus());
        transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
        transactionSendResult.setMsgId(sendResult.getMsgId());
        transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
        transactionSendResult.setTransactionId(sendResult.getTransactionId());
        transactionSendResult.setLocalTransactionState(localTransactionState);
        // 返回事务消息发送结果,这里已经返回本地事务执行状态了
        return transactionSendResult;
    

关于上面调用事务结束请求的方法,具体代码及注释如下:

    public void endTransaction(
        final Message msg,
        final SendResult sendResult,
        final LocalTransactionState localTransactionState,
        final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException 
        final MessageId id;
        // getOffsetMsgId,这个是服务端的msgId,包含了不少消息的元信息
        if (sendResult.getOffsetMsgId() != null) 
            id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
         else 
            id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
        
        String transactionId = sendResult.getTransactionId();
        // 半消息发到了哪个broker上,最后提交也得到这个broker上
        final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
        EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
        requestHeader.setTransactionId(transactionId);
        // 设置事务消息的提交偏移(提交到内部的事务topic上了)
        requestHeader.setCommitLogOffset(id.getOffset());
        switch (localTransactionState) 
            case COMMIT_MESSAGE:
                requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
                break;
            case ROLLBACK_MESSAGE:
                requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
                break;
            case UNKNOW:
                requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
                break;
            default:
                break;
        

        doExecuteEndTransactionHook(msg, sendResult.getMsgId(), brokerAddr, localTransactionState, false);
        requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
        requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
        requestHeader.setMsgId(sendResult.getMsgId());
        // 竟然还带本地事务异常信息
        String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;
        // 2阶段执行的消息
        this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
            this.defaultMQProducer.getSendMsgTimeout());
    

通过查阅上面两个方法的代码基本对客户的事务消息发送部分,已经比较清楚了(事务回查的处理部分在后面) 

broker端处理,接收事务半消息(预提交)

broker端在接收到事务消息的基本处理流程如下:

简单来说,事务消息也如普通消息一样发送到broker,broker像接收普通一样接收,接收到之后会判断是否有事务标记,有的话,就把这条消息的所有信息写入一个内部的事物topic里,来保证暂时对消费端不可见,关键源码如下(以异步写入为示例):

    private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,
                                                                SendMessageContext mqtraceContext,
                                                                SendMessageRequestHeader requestHeader) 
        final RemotingCommand response = preSend(ctx, request, requestHeader);
        final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();

        ...
        // 省略上面一部分代码,主要看下面判断这是一条事务消息
        // 如果这个属性存在,说明是发送的事务消息
        if (transFlag != null && Boolean.parseBoolean(transFlag)) 
            // broker检查是否启用事务消息了
            if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) 
                response.setCode(ResponseCode.NO_PERMISSION);
                response.setRemark(
                        "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
                                + "] sending transaction message is forbidden");
                return CompletableFuture.completedFuture(response);
            
            // 从这里可心看到,事务消息是一个单独的流程处理,和其它消息不一样
            putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
         else 
            putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
        
        return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);
    

 发现这是一条事务消息后,备份事务消息的代码如下:

    private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) 
        // 会把消息的原始topic及队列信息存储到属性中,因为要写到事务topic的队列里,就是备份原消息
        MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
        MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
            String.valueOf(msgInner.getQueueId()));
        // 把事务标记也去掉
        msgInner.setSysFlag(
            MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
        // 设置当前存储的消息的topic为:RMQ_SYS_TRANS_HALF_TOPIC, 事务半消息的topic
        msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
        // 发送到队列0里, 这个topic也只有一条队列,另外还用到的一个topic是:RMQ_SYS_TRANS_OP_HALF_TOPIC,也是只有一条队列在每个broker上
        msgInner.setQueueId(0);
        msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
        return msgInner;
    

然后就是将这条事务半消息如果普通消息一样写入到内部的事务topic里了

    public CompletableFuture<PutMessageResult> asyncPutHalfMessage(MessageExtBrokerInner messageInner) 
        // 见名知义,事务半消息处理
        // 事务半消息存储完成,基本半消息发送(一阶段)已经算是结束了,在写入commitlog的时候,基本没有对这个事务topic做额外处理了,就像普通消息那样了
        return store.asyncPutMessage(parseHalfMessageInner(messageInner));
    

broker端处理,事务结束

前面提到客户端在一阶段(发送事务半消息后,然后执行本地事务),会再根据事务执行状态给broker发送一条事务结束的请求,告诉broker是提交还是要回滚,基本流程如下:

事务处理上还是做了不少动作的,看一下它的关键源码实现:

    public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws
        RemotingCommandException 
        // 事务消息结束(二阶段)处理
        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
        final EndTransactionRequestHeader requestHeader =
            (EndTransactionRequestHeader)request.decodeCommandCustomHeader(EndTransactionRequestHeader.class);
        LOGGER.debug("Transaction request:", requestHeader);
        // 从节点是不允许处理事务消息的
        if (BrokerRole.SLAVE == brokerController.getMessageStoreConfig().getBrokerRole()) 
            response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);
            LOGGER.warn("Message store is slave mode, so end transaction is forbidden. ");
            return response;
        

        // 事务回查标记,是否为事务回查
        if (requestHeader.getFromTransactionCheck()) 
            switch (requestHeader.getCommitOrRollback()) 
                case MessageSysFlag.TRANSACTION_NOT_TYPE: 
                    LOGGER.warn("Check producer[] transaction state, but it's pending status."
                            + "RequestHeader:  Remark: ",
                        RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                        requestHeader.toString(),
                        request.getRemark());
                    return null;
                

                case MessageSysFlag.TRANSACTION_COMMIT_TYPE: 
                    LOGGER.warn("Check producer[] transaction state, the producer commit the message."
                            + "RequestHeader:  Remark: ",
                        RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                        requestHeader.toString(),
                        request.getRemark());

                    break;
                

                case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: 
                    LOGGER.warn("Check producer[] transaction state, the producer rollback the message."
                            + "RequestHeader:  Remark: ",
                        RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                        requestHeader.toString(),
                        request.getRemark());
                    break;
                
                default:
                    return null;
            
         else 
            // 只是为了打条日志
            switch (requestHeader.getCommitOrRollback()) 
                // 本地事务执行状态返回的是UNKNOW,该回查了
                case MessageSysFlag.TRANSACTION_NOT_TYPE: 
                    LOGGER.warn("The producer[] end transaction in sending message,  and it's pending status."
                            + "RequestHeader:  Remark: ",
                        RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                        requestHeader.toString(),
                        request.getRemark());
                    return null;
                

                case MessageSysFlag.TRANSACTION_COMMIT_TYPE: 
                    break;
                

                case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: 
                    LOGGER.warn("The producer[] end transaction in sending message, rollback the message."
                            + "RequestHeader:  Remark: ",
                        RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                        requestHeader.toString(),
                        request.getRemark());
                    break;
                
                default:
                    return null;
            
        
        OperationResult result = new OperationResult();
        if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) 
            // 开始提交事务消息
            // 这里就是根据之前提交的内部事务topic的半消息偏移,查出来提交的这条消息
            result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);
            if (result.getResponseCode() == ResponseCode.SUCCESS) 
                // result.getPrepareMessage()就是之前提交到内部的事务topic上的那条半消息,检查下这条信息是否正确,日志偏移呀什么的是否匹配,是不是查错消息了
                RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
                if (res.getCode() == ResponseCode.SUCCESS) 
                    // 一切都OK了,准备提交事务,这里就是把原始消息信息,原原本本的恢复过来
                    MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
                    msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
                    msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
                    msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
                    msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
                    // 清除事务消息属性
                    MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED);
                    // 原始消息写入对应的topic,此时对消费端就可见了,可以正常消费了
                    RemotingCommand sendResult = sendFinalMessage(msgInner);
                    if (sendResult.getCode() == ResponseCode.SUCCESS) 
                        // 删除的动作是在op队列(RMQ_SYS_TRANS_OP_HALF_TOPIC)写入该消息,tag是d,消息体是在事务topic里的消息偏移
                        this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
                    
                    return sendResult;
                
                return res;
            
         else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) 
            // 事务回滚
            result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);
            if (result.getResponseCode() == ResponseCode.SUCCESS) 
                RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
                if (res.getCode() == ResponseCode.SUCCESS) 
                    this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
                
                return res;
            
        
        /**
         * 总结一下:
         * commit:就是先把原始消息写入到原始topic里,然后删除半消息就是在op 的事务topic里写入一条tag为d的消息,消息体就是半消息的偏移值
         * rollback: 就是直接删除过程了
         * unknown: 就是上面 两步都没做,原始消息未写入,op队列里也没有
         */
        // UNKNOW状态了,看来得回查了,其实这里返不返回都一样,客户端是one way调用
        response.setCode(result.getResponseCode());
        response.setRemark(result.getResponseRemark());
        return response;
    

事务回查

前面的流程呢,都是事务的正常处理,但是如果客户端在发送事务请求的时候,宕机、重启、网络原因等,最终是导致事务结束的请求没有正确发送给broker处理,那就需要事务回查机制。

broker启动的时候,会启动一个定时任务(默认是1分钟),从前面提到的事务topic的队列里拉取消息,检查拉取到的消息是否已经处理过了(比如提交或回滚),如果没有,根据是否要进行事务回查,让客户再检查一下本地事务的执行状态并告诉broker或者丢弃。

其实这里涉及到几个关键问题需要明白:

  1. 写入到事务topic里的事务半消息在事务结束后进行删除,但是rocketmq是追加写的方式,所以这里的删除并不是从消息队列里真正的删除一条消息。
  2. broker怎么知道一条事务半消息是否已经提交或者回滚了,正如前面说的,这里引入一个op队列,即另一个内部topic,如果一条消息已经提交或回滚了,就向op队列里写入一条消息消息体就是在事务topic队列里的偏移值,如果op队列里没有,那就说明这条事务消息的状态还没有提交,还是未知的,可能需要事务回查。
  3. 我们知道写入到事务topic的事务半消息也如普通消息一样,是顺序写顺序读的,如果此时已经写入1、2、3、4、5、6共6条事务消息了,1、2、5的事务状态已经提交或者回滚了,但是3、4还是未知的,那总不能再重新回头消费吧。并没有,如果broker发现这条消息是未知状态的,那在处理的时候,把这条消息再追回写入到事务topic的队列里,然后找客户端回查。继续下一条消息处理,等到再处理到刚才重新追加的这条事务消息的时候,再从op队列里检查一下,这条事务半消息是否已经处理过了,如果还没有而且也没达到事务回查的最大次数,那就再追回写回去,再继续呗。如果已经达到最大次数,就丢弃(其实是写到另一个内部topic,也就是说事务消息这里用到了3个内部topic来存储数据)

关于事务回查这里主要采用文字描述说明了,就不再画一个流程图了,关键源码如下:

broker默认每分钟检查一次,从内部事务topic队列和op队列里拉取消息,然后比对,当前的事务半消息是否已经处理过了,是否需要回查:

    // 定时检查事务消息(1分钟查一次)
    @Override
    public void check(long transactionTimeout, int transactionCheckMax,
        AbstractTransactionalMessageCheckListener listener) 
        try 
            String topic = TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC;
            Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);
            if (msgQueues == null || msgQueues.size() == 0) 
                log.warn("The queue of topic is empty :" + topic);
                return;
            
            log.debug("Check topic=, queues=", topic, msgQueues);
            for (MessageQueue messageQueue : msgQueues) 
                long startTime = System.currentTimeMillis();
                // 一条预消息队列对应一个op队列(实际也就1条队列)
                MessageQueue opQueue = getOpQueue(messageQueue);
                // 获取事务topic和op topic的消费偏移
                long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue);
                long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue);
                log.info("Before check, the queue= msgOffset= opOffset=", messageQueue, halfOffset, opOffset);
                if (halfOffset < 0 || opOffset < 0) 
                    log.error("MessageQueue:  illegal offset read: , op offset: ,skip this queue", messageQueue,
                        halfOffset, opOffset);
                    continue;
                

                List<Long> doneOpOffset = new ArrayList<>();
                HashMap<Long, Long> removeMap = new HashMap<>();
                PullResult pullResult = fillOpRemoveMap(removeMap, opQueue, opOffset, halfOffset, doneOpOffset);
                if (null == pullResult) 
                    log.error("The queue= check msgOffset= with opOffset= failed, pullResult is null",
                        messageQueue, halfOffset, opOffset);
                    continue;
                
                // single thread
                int getMessageNullCount = 1;
                long newOffset = halfOffset;
                long i = halfOffset;
                while (true) 
                    if (System.currentTimeMillis() - startTime > MAX_PROCESS_TIME_LIMIT) 
                        log.info("Queue= process time reach max=", messageQueue, MAX_PROCESS_TIME_LIMIT);
                        break;
                    
                    // 已经处理过的,没必要再处理一次
                    if (removeMap.containsKey(i)) 
                        log.debug("Half offset  has been committed/rolled back", i);
                        Long removedOpOffset = removeMap.remove(i);
                        // op的队列偏移
                        doneOpOffset.add(removedOpOffset);
                     else 
                        // 获取当前要处理的half消息
                        GetResult getResult = getHalfMsg(messageQueue, i);
                        MessageExt msgExt = getResult.getMsg();
                        if (msgExt == null) 
                            if (getMessageNullCount++ > MAX_RETRY_COUNT_WHEN_HALF_NULL) 
                                break;
                            
                            if (getResult.getPullResult().getPullStatus() == PullStatus.NO_NEW_MSG) 
                                log.debug("No new msg, the miss offset= in=, continue check=, pull result=", i,
                                    messageQueue, getMessageNullCount, getResult.getPullResult());
                                break;
                             else 
                                log.info("Illegal offset, the miss offset= in=, continue check=, pull result=",
                                    i, messageQueue, getMessageNullCount, getResult.getPullResult());
                                i = getResult.getPullResult().getNextBeginOffset();
                                newOffset = i;
                                continue;
                            
                        

                        // 超过15次丢弃,或者消息过期了(超过了设置的文件保存时间)
                        if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) 
                            // 默认实现是移动到TRANS_CHECK_MAXTIME_TOPIC这个topic里
                            listener.resolveDiscardMsg(msgExt);
                            newOffset = i + 1;
                            i++;
                            continue;
                        
                        if (msgExt.getStoreTimestamp() >= startTime) 
                            log.debug("Fresh stored. the miss offset=, check it later, store=", i,
                                new Date(msgExt.getStoreTimestamp()));
                            break;
                        

                        long valueOfCurrentMinusBorn = System.currentTimeMillis() - msgExt.getBornTimestamp();
                        long checkImmunityTime = transactionTimeout;
                        // 未找到写入这个属性的地方(除了test)
                        String checkImmunityTimeStr = msgExt.getUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS);
                        if (null != checkImmunityTimeStr) 
                            checkImmunityTime = getImmunityTime(checkImmunityTimeStr, transactionTimeout);
                            if (valueOfCurrentMinusBorn < checkImmunityTime) 
                                // 超过这个检查时间,重新写回半消息队列
                                if (checkPrepareQueueOffset(removeMap, doneOpOffset, msgExt)) 
                                    newOffset = i + 1;
                                    i++;
                                    continue;
                                
                            
                         else 
                            // 新提交的半消息,暂不处理,估计是认为事务也可能没执行完,处理也没意义
                            if ((0 <= valueOfCurrentMinusBorn) && (valueOfCurrentMinusBorn < checkImmunityTime)) 
                                log.debug("New arrived, the miss offset=, check it later checkImmunity=, born=", i,
                                    checkImmunityTime, new Date(msgExt.getBornTimestamp()));
                                break;
                            
                        
                        List<MessageExt> opMsg = pullResult.getMsgFoundList();
                        // checkImmunityTime默认是6秒,第一次可以检查的时间
                        // 正常来说,每条提交/回滚就是已经处理过的消息,在op队列里都有一条消息,如果没有(第一次回查),或者已经有了,但是存放时间已经满足检查条件了,都得回查
                        boolean isNeedCheck = (opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime)
                            || (opMsg != null && (opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout))
                            || (valueOfCurrentMinusBorn <= -1);

                        if (isNeedCheck) 
                            // 把这个消息重新写回half队列里
                            if (!putBackHalfMsgQueue(msgExt, i)) 
                                continue;
                            
                            // 事务回查,确认状态后,下次再处理上边这个写回的半消息
                            listener.resolveHalfMsg(msgExt);
                         else 
                            pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset);
                            log.debug("The miss offset: in messageQueue: need to get more opMsg, result is:", i,
                                messageQueue, pullResult);
                            continue;
                        
                    
                    newOffset = i + 1;
                    i++;
                
                if (newOffset != halfOffset) 
                    transactionalMessageBridge.updateConsumeOffset(messageQueue, newOffset);
                
                //已经连接处理的偏移,如果2,3,4,6,7,则最偏移到4.
                long newOpOffset = calculateOpOffset(doneOpOffset, opOffset);
                if (newOpOffset != opOffset) 
                    transactionalMessageBridge.updateConsumeOffset(opQueue, newOpOffset);
                
            
         catch (Throwable e) 
            log.error("Check error", e);
        

        // 总之,一条一条拉取,如果在op队列,就是已经commit或者rollback的,不用再管了,否则就检查是否需要回查,需要的话,这条写再写回half队列

关于如何从op队列里确认事务半消息已经处理过了,主要就是根据op队列里拉取的消息的消息体(保存的是事务半消息的偏移值)来判断当前偏移的事务消息是否已经处理过了:

    /**
     * Read op message, parse op message, and fill removeMap
     *
     * @param removeMap Half message to be remove, key:halfOffset, value: opOffset.
     * @param opQueue Op message queue.
     * @param pullOffsetOfOp The begin offset of op message queue.
     * @param miniOffset The current minimum offset of half message queue.
     * @param doneOpOffset Stored op messages that have been processed.
     * @return Op message result.
     */
    private PullResult fillOpRemoveMap(HashMap<Long, Long> removeMap,
        MessageQueue opQueue, long pullOffsetOfOp, long miniOffset, List<Long> doneOpOffset) 
        // 使用CID_RMQ_SYS_TRANS拉取op队列里的消息
        PullResult pullResult = pullOpMsg(opQueue, pullOffsetOfOp, 32);
        if (null == pullResult) 
            return null;
        
        if (pullResult.getPullStatus() == PullStatus.OFFSET_ILLEGAL
            || pullResult.getPullStatus() == PullStatus.NO_MATCHED_MSG) 
            log.warn("The miss op offset= in queue= is illegal, pullResult=", pullOffsetOfOp, opQueue,
                pullResult);
            transactionalMessageBridge.updateConsumeOffset(opQueue, pullResult.getNextBeginOffset());
            return pullResult;
         else if (pullResult.getPullStatus() == PullStatus.NO_NEW_MSG) 
            log.warn("The miss op offset= in queue= is NO_NEW_MSG, pullResult=", pullOffsetOfOp, opQueue,
                pullResult);
            return pullResult;
        
        List<MessageExt> opMsg = pullResult.getMsgFoundList();
        if (opMsg == null) 
            log.warn("The miss op offset= in queue= is empty, pullResult=", pullOffsetOfOp, opQueue, pullResult);
            return pullResult;
        
        // 对拉取的消息做过滤处理,判断一下这些op消息对应的half消息是否处理过了
        for (MessageExt opMessageExt : opMsg) 
            // 记录这条op消息对应在事务队列里的偏移值
            Long queueOffset = getLong(new String(opMessageExt.getBody(), TransactionalMessageUtil.charset));
            log.debug("Topic:  tags: , OpOffset: , HalfOffset: ", opMessageExt.getTopic(),
                opMessageExt.getTags(), opMessageExt.getQueueOffset(), queueOffset);
            if (TransactionalMessageUtil.REMOVETAG.equals(opMessageExt.getTags())) 
                // 找到的都是需要"删除"的半消息
                // miniOffset就是halfOffset,将要消费的最小偏移 ,这是处理完成待删除的op消息
                if (queueOffset < miniOffset) 
                    doneOpOffset.add(opMessageExt.getQueueOffset());
                 else 
                    // op消息保存的是half消息的偏移,这个值竟然大于当前half消息的偏移,这是已经处理过的,不需要再处理了
                    removeMap.put(queueOffset, opMessageExt.getQueueOffset());
                
             else 
                log.error("Found a illegal tag in opMessageExt=  ", opMessageExt);
            
        
        log.debug("Remove map: ", removeMap);
        log.debug("Done op list: ", doneOpOffset);
        return pullResult;
    

broker在该生产组下找到一个生产者客户端发送回查请求:

    public void sendCheckMessage(MessageExt msgExt) throws Exception 
        CheckTransactionStateRequestHeader checkTransactionStateRequestHeader = new CheckTransactionStateRequestHeader();
        checkTransactionStateRequestHeader.setCommitLogOffset(msgExt.getCommitLogOffset());
        checkTransactionStateRequestHeader.setOffsetMsgId(msgExt.getMsgId());
        checkTransactionStateRequestHeader.setMsgId(msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
        checkTransactionStateRequestHeader.setTransactionId(checkTransactionStateRequestHeader.getMsgId());
        checkTransactionStateRequestHeader.setTranStateTableOffset(msgExt.getQueueOffset());
        msgExt.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));
        msgExt.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));
        msgExt.setStoreSize(0);
        String groupId = msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
        // 根据生产组找到对应的生产者实例,发送一个回查请求
        Channel channel = brokerController.getProducerManager().getAvailableChannel(groupId);
        if (channel != null) 
            brokerController.getBroker2Client().checkProducerTransactionState(groupId, channel, checkTransactionStateRequestHeader, msgExt);
         else 
            LOGGER.warn("Check transaction failed, channel is null. groupId=", groupId);
        
    

客户端收到请求后,执行事务回查逻辑,并将事务状态发回broker:

    // 事务反查
    public RemotingCommand checkTransactionState(ChannelHandlerContext ctx,
        RemotingCommand request) throws RemotingCommandException 
        final CheckTransactionStateRequestHeader requestHeader =
            (CheckTransactionStateRequestHeader) request.decodeCommandCustomHeader(CheckTransactionStateRequestHeader.class);
        final ByteBuffer byteBuffer = ByteBuffer.wrap(request.getBody());
        final MessageExt messageExt = MessageDecoder.decode(byteBuffer);
        if (messageExt != null) 
            if (StringUtils.isNotEmpty(this.mqClientFactory.getClientConfig().getNamespace())) 
                messageExt.setTopic(NamespaceUtil
                    .withoutNamespace(messageExt.getTopic(), this.mqClientFactory.getClientConfig().getNamespace()));
            
            String transactionId = messageExt.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
            if (null != transactionId && !"".equals(transactionId)) 
                // 就是客户端 msg Id,如果用户没有自定义设置这个值
                messageExt.setTransactionId(transactionId);
            
            final String group = messageExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
            if (group != null) 
                MQProducerInner producer = this.mqClientFactory.selectProducer(group);
                if (producer != null) 
                    final String addr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
                    producer.checkTransactionState(addr, messageExt, requestHeader);
                 else 
                    log.debug("checkTransactionState, pick producer by group[] failed", group);
                
             else 
                log.warn("checkTransactionState, pick producer group failed");
            
         else 
            log.warn("checkTransactionState, decode message failed");
        

        return null;
    

结语

关于rocketmq事务消息篇到此结束,后续如果有需要会再进行补充。

rocketmq源码分析之rocketmq事务消息实现原理中篇----事务消息状态回查(代码片段)

上节已经梳理了RocketMQ发送事务消息的流程(基于二阶段提交),本节将继续深入学习事务状态消息回查,我们知道,第一次提交到消息服务器时消息的主题被替换为RMQ_SYS_TRANS_HALF_TOPIC,本地事务执行完后如果返回本地事务状态... 查看详情

rocketmq源码分析之rocketmq事务消息实现原理中篇----事务消息状态回查(代码片段)

上节已经梳理了RocketMQ发送事务消息的流程(基于二阶段提交),本节将继续深入学习事务状态消息回查,我们知道,第一次提交到消息服务器时消息的主题被替换为RMQ_SYS_TRANS_HALF_TOPIC,本地事务执行完后如果返回本地事务状态... 查看详情

rocketmq源码分析之从官方示例窥探:rocketmq事务消息实现基本思想(代码片段)

RocketMQ4.3.0版本开始支持事务消息,后续分享将开始将剖析事务消息的实现原理。首先从官方给出的Demo实例入手,以此通往RocketMQ事务消息的世界中。官方版本未发布之前,从apacherocketmq第一个版本上线后,代码中存在与事务消息... 查看详情

rocketmq事务消息实战(代码片段)

RocketMQ事务消息阅读目录指引:RocketMQ源码分析之从官方示例窥探RocketMQ事务消息实现基本思想RocketMQ源码分析之RocketMQ事务消息实现原理上篇RocketMQ源码分析之RocketMQ事务消息实现原理中篇----事务消息状态回查RocketMQ源码分析... 查看详情

rocketmq源码分析4-事务消息实现原理

为什么消息要具备事务能力参见还是比较清晰的。简单的说就是在你业务逻辑过程中,需要发送一条消息给订阅消息的人,但是期望是此逻辑过程完全成功完成之后才能使订阅者收到消息。业务逻辑过程假设是这样的:逻辑部分... 查看详情

rocketmq源码分析之rocketmq事务消息实现原下篇(事务提交或回滚)

本文将重点分析RocketMQBroker如何处理事务消息提交、回滚命令,根据前面的介绍,其入口EndTransactionProcessor#proce***equest:OperationResultresult=newOperationResult();if(MessageSysFlag.TRANSACTION_COMMIT_TYPE==requestHeader.getCommitOrRollback())//@1result=this.broke... 查看详情

消息中间件rocketmq源码解析:事务消息

650)this.width=650;"src="http://www.yunai.me/images/common/wechat_mp.jpeg"style="border:2pxsolidrgb(238,238,238);margin-top:0px;"/>关注微信公众号:【芋艿的后端小屋】有福利:RocketMQ/MyCAT/Sharding-JDBC 所有源码分析文章列表Rock 查看详情

rocketmq事务消息详解(代码片段)

...#x1f34a;Java学习:Java从入门到精通总结🍊深入浅出RocketMQ设计思想:深入浅出RocketMQ设计思想🍊绝对不一样的职场干货:大厂最佳实践经验指南📆最近更新:2023年4月9日🍊个人简介:通信工程本... 查看详情

rocketmq源码合集

消息队列中间件RocketMQ源码分析——Message发送与接收消息队列中间件RocketMQ源码分析——Message存储分布式消息队列RocketMQ源码分析——Message拉取与消费(上)分布式消息队列RocketMQ源码分析——Message拉取与消费(下&#x... 查看详情

30rocketmq事务消息的代码实现细节(代码片段)

...。1.发送half事务消息出去packagecom.mqTrsMessage;importorg.apache.rocketmq.client.producer.DefaultMQProducer;importorg.apache.rocketmq.clien 查看详情

rocketmq事务消息入门介绍(代码片段)

说明周五的时候发了篇:Rocketmq4.3支持事务啦!!!,趁着周末的时候把相关内容看了下,下面的主要内容就是关于RocketMQ事务相关内容介绍了。说明:今天这篇仅仅是入门介绍,并没有涉及到很多细节,先把大概流程说明白,... 查看详情

rocketmq(09)——发送事务消息(代码片段)

发送事务消息RocketMQ支持发送事务消息,它的事务消息是基于二阶段提交机制实现的。当发送的消息是事务消息时,只有对应的消息被提交了才能被消费者进行消费。发送事务消息时生产者需要使用TransactionMQProducer,它还需要指... 查看详情

rocketmq源码之事务消息的回调方法应该怎么写?

两个回调方法:发送消息成功之后执行事务的executeLocalTransaction,回查时候调用的checkLocalTransaction。思路:执行事务的时候,调用service的方法,这个方法需要用事务注解,方法参数中传入唯一id,事务方法最后判断如果id不为空... 查看详情

源码分析rocketmq系列索引

1、RocketMQ源码分析之NameServer2、RocketMQ源码分析之Broker概述与同步消息发送原理与高可用设计及思考3、源码分析RocketMQ之CommitLog消息存储机制4、源码分析RocketMQ之消息消费5、源码分析RocketMQ消息消费机制----消费者拉取消息机制6、... 查看详情

rocketmq(09)——发送事务消息(代码片段)

发送事务消息RocketMQ支持发送事务消息,它的事务消息是基于二阶段提交机制实现的。当发送的消息是事务消息时,只有对应的消息被提交了才能被消费者进行消费。发送事务消息时生产者需要使用TransactionMQProducer,... 查看详情

rocketmq事务消息原理(代码片段)

一、RocketMQ事务消息原理:        RocketMQ在4.3版本之后实现了完整的事务消息,基于MQ的分布式事务方案,本质上是对本地消息表的一个封装,整体流程与本地消息表一致,唯一不同的就是将本地消息表存... 查看详情

27发送消息零丢失方案:rocketmq事务消息的实现流程分析

...常见的网络故障之类的问题,导致消息就丢失了。在RocketMQ中,有这么一个功能,就是事务消息的功能,凭借这个事务级的消息机制,就可以让我们确保订单系统推送出去的消息一定会成功写入MQ里 查看详情

rocketmq实现事务消息方案(代码片段)

RocketMQ是一个来自阿里巴巴的分布式消息中间件,于2012年开源,并在2017年正式成为Apache顶级项目。据了解,包括阿里云上的消息产品以及收购的子公司在内,阿里集团的消息产品全线都运行在RocketMQ之上,并且最近几年的双十一... 查看详情