关键词:
前言
本文继前两篇对事务消息源码进行分析。
事务消息处理基本流程
在介绍事务消息的时候,画了一个简单的流程图说明事务消息的整体处理流程:
p.s. 下面的序号(1、2、3...)表示顺序,与上图中的(1、2、3...)无关。
- 事务生产者调用事务消息发送接口,发送消息
- 开始预提交阶段,客户端发送预消息并在请求头标记这是一条事务消息。消息体就是我们实际要发送的消息内容
- broker接收到消息,发现这是一条事务消息,于是将当前消息备份。所谓“备份”即将当前消息的所有数据写入内部的事务topic中而不是我们实际要发送的topic,该事务topic由于消费端并没有订阅,所以这条消息对消费端不可见,然后响应客户端的发送请求
- 客户端确认发送成功,则执行本地事务,并标记事务执行状态。如果发送失败,就不需要执行本地事务了,直接标记事务执行失败,需要回滚。
- 基于事务的执行状态,给本次发送事务消息的那个broker发送一条结束事务的请求(请求头里包含是提交还是回滚,亦或者是未知状态)
- broker收到事务结束的请求,如果是未知状态就打条日志直接返回了;如果是提交事务,就将备份的那条事务消息恢复过来,写入到原始的topic里,此时就对消费端可见了,然后要在op队列里(另一个内部topic)写入一条消息,消息体就是当前这条事务消息的队列偏移值。如果是回滚事务,就只用在op队列里写入一条消息即可,就不还原事务消息了,这样对消费端就不可见。关于op队列的具体作用,后面源码部分再详说。
- 说一下事务回查。事务回查就是broker扫描到那些没有提交也没回滚的消息,找到客户端,发一个请求,让客户端再次提交一下事务结束状态。
源码剖析
整体流程涉及的代码还是比较多的,接下来对每一部分的源码拆开进行分析。
客户端处理,事务执行流程
客户端处理基本流程如下:
源码的主要入口实现部分在这个方法里:
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendMessageInTransaction(final Message msg, final LocalTransactionExecuter localTransactionExecuter, final Object arg) throws MQClientException;
代码如下,我已经加上相关注释:
public TransactionSendResult sendMessageInTransaction(final Message msg,
final LocalTransactionExecuter localTransactionExecuter, final Object arg)
throws MQClientException
// 获取我们创建的事务消息监听器(本地事务执行及事务回查)
TransactionListener transactionListener = getCheckListener();
if (null == localTransactionExecuter && null == transactionListener)
throw new MQClientException("tranExecutor is null", null);
// 事务消息不支持延时消息
// ignore DelayTimeLevel parameter
if (msg.getDelayTimeLevel() != 0)
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
Validators.checkMessage(msg, this.defaultMQProducer);
SendResult sendResult = null;
// 设置半消息属性(TRAN_MSG),标志这是一个事务半消息
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
// 设置生产组属性(PGROUP)
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
try
// 消息发送,这里就是平常同步发送普通消息的默认发送流程了,在客户端这里,就已经没有太多其它额外的针对事务消息的处理了,主要还是会判断是半消息的话,打个事务消息的标记
sendResult = this.send(msg);
catch (Exception e)
throw new MQClientException("send message Exception", e);
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable localException = null;
switch (sendResult.getSendStatus())
case SEND_OK:
try
// transactionId
if (sendResult.getTransactionId() != null)
msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
// UNIQ_KEY,这是客户端发送的时侯生成的一个唯一ID,也就是我们平常用的sendResult里的msgId
String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (null != transactionId && !"".equals(transactionId))
// 在这种情况下,transactionId其实就是message的客户端msgId
msg.setTransactionId(transactionId);
// 一般,我平常使用的时候,不会采用localTransactionExecuter方式调用事务消息接口,所以这里一般是空
if (null != localTransactionExecuter)
localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
else if (transactionListener != null)
log.debug("Used new transaction API");
// 执行我们本地事务
localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
if (null == localTransactionState)
localTransactionState = LocalTransactionState.UNKNOW;
if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE)
log.info("executeLocalTransactionBranch return ", localTransactionState);
log.info(msg.toString());
catch (Throwable e)
log.info("executeLocalTransactionBranch exception", e);
log.info(msg.toString());
localException = e;
break;
// 未发送成功,回滚消息
case FLUSH_DISK_TIMEOUT:
case FLUSH_SLAVE_TIMEOUT:
case SLAVE_NOT_AVAILABLE:
localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
break;
default:
break;
try
// 事务结束处理,是提交还是回滚还是要做其它操作
this.endTransaction(msg, sendResult, localTransactionState, localException);
catch (Exception e)
log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
TransactionSendResult transactionSendResult = new TransactionSendResult();
transactionSendResult.setSendStatus(sendResult.getSendStatus());
transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
transactionSendResult.setMsgId(sendResult.getMsgId());
transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
transactionSendResult.setTransactionId(sendResult.getTransactionId());
transactionSendResult.setLocalTransactionState(localTransactionState);
// 返回事务消息发送结果,这里已经返回本地事务执行状态了
return transactionSendResult;
关于上面调用事务结束请求的方法,具体代码及注释如下:
public void endTransaction(
final Message msg,
final SendResult sendResult,
final LocalTransactionState localTransactionState,
final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException
final MessageId id;
// getOffsetMsgId,这个是服务端的msgId,包含了不少消息的元信息
if (sendResult.getOffsetMsgId() != null)
id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
else
id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
String transactionId = sendResult.getTransactionId();
// 半消息发到了哪个broker上,最后提交也得到这个broker上
final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
requestHeader.setTransactionId(transactionId);
// 设置事务消息的提交偏移(提交到内部的事务topic上了)
requestHeader.setCommitLogOffset(id.getOffset());
switch (localTransactionState)
case COMMIT_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
break;
case ROLLBACK_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
break;
case UNKNOW:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
break;
default:
break;
doExecuteEndTransactionHook(msg, sendResult.getMsgId(), brokerAddr, localTransactionState, false);
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
requestHeader.setMsgId(sendResult.getMsgId());
// 竟然还带本地事务异常信息
String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;
// 2阶段执行的消息
this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
this.defaultMQProducer.getSendMsgTimeout());
通过查阅上面两个方法的代码基本对客户的事务消息发送部分,已经比较清楚了(事务回查的处理部分在后面)
broker端处理,接收事务半消息(预提交)
broker端在接收到事务消息的基本处理流程如下:
简单来说,事务消息也如普通消息一样发送到broker,broker像接收普通一样接收,接收到之后会判断是否有事务标记,有的话,就把这条消息的所有信息写入一个内部的事物topic里,来保证暂时对消费端不可见,关键源码如下(以异步写入为示例):
private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,
SendMessageContext mqtraceContext,
SendMessageRequestHeader requestHeader)
final RemotingCommand response = preSend(ctx, request, requestHeader);
final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();
...
// 省略上面一部分代码,主要看下面判断这是一条事务消息
// 如果这个属性存在,说明是发送的事务消息
if (transFlag != null && Boolean.parseBoolean(transFlag))
// broker检查是否启用事务消息了
if (this.brokerController.getBrokerConfig().isRejectTransactionMessage())
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(
"the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
+ "] sending transaction message is forbidden");
return CompletableFuture.completedFuture(response);
// 从这里可心看到,事务消息是一个单独的流程处理,和其它消息不一样
putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
else
putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);
发现这是一条事务消息后,备份事务消息的代码如下:
private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner)
// 会把消息的原始topic及队列信息存储到属性中,因为要写到事务topic的队列里,就是备份原消息
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
String.valueOf(msgInner.getQueueId()));
// 把事务标记也去掉
msgInner.setSysFlag(
MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
// 设置当前存储的消息的topic为:RMQ_SYS_TRANS_HALF_TOPIC, 事务半消息的topic
msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
// 发送到队列0里, 这个topic也只有一条队列,另外还用到的一个topic是:RMQ_SYS_TRANS_OP_HALF_TOPIC,也是只有一条队列在每个broker上
msgInner.setQueueId(0);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
return msgInner;
然后就是将这条事务半消息如果普通消息一样写入到内部的事务topic里了
public CompletableFuture<PutMessageResult> asyncPutHalfMessage(MessageExtBrokerInner messageInner)
// 见名知义,事务半消息处理
// 事务半消息存储完成,基本半消息发送(一阶段)已经算是结束了,在写入commitlog的时候,基本没有对这个事务topic做额外处理了,就像普通消息那样了
return store.asyncPutMessage(parseHalfMessageInner(messageInner));
broker端处理,事务结束
前面提到客户端在一阶段(发送事务半消息后,然后执行本地事务),会再根据事务执行状态给broker发送一条事务结束的请求,告诉broker是提交还是要回滚,基本流程如下:
事务处理上还是做了不少动作的,看一下它的关键源码实现:
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws
RemotingCommandException
// 事务消息结束(二阶段)处理
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final EndTransactionRequestHeader requestHeader =
(EndTransactionRequestHeader)request.decodeCommandCustomHeader(EndTransactionRequestHeader.class);
LOGGER.debug("Transaction request:", requestHeader);
// 从节点是不允许处理事务消息的
if (BrokerRole.SLAVE == brokerController.getMessageStoreConfig().getBrokerRole())
response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);
LOGGER.warn("Message store is slave mode, so end transaction is forbidden. ");
return response;
// 事务回查标记,是否为事务回查
if (requestHeader.getFromTransactionCheck())
switch (requestHeader.getCommitOrRollback())
case MessageSysFlag.TRANSACTION_NOT_TYPE:
LOGGER.warn("Check producer[] transaction state, but it's pending status."
+ "RequestHeader: Remark: ",
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
requestHeader.toString(),
request.getRemark());
return null;
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
LOGGER.warn("Check producer[] transaction state, the producer commit the message."
+ "RequestHeader: Remark: ",
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
requestHeader.toString(),
request.getRemark());
break;
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
LOGGER.warn("Check producer[] transaction state, the producer rollback the message."
+ "RequestHeader: Remark: ",
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
requestHeader.toString(),
request.getRemark());
break;
default:
return null;
else
// 只是为了打条日志
switch (requestHeader.getCommitOrRollback())
// 本地事务执行状态返回的是UNKNOW,该回查了
case MessageSysFlag.TRANSACTION_NOT_TYPE:
LOGGER.warn("The producer[] end transaction in sending message, and it's pending status."
+ "RequestHeader: Remark: ",
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
requestHeader.toString(),
request.getRemark());
return null;
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
break;
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
LOGGER.warn("The producer[] end transaction in sending message, rollback the message."
+ "RequestHeader: Remark: ",
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
requestHeader.toString(),
request.getRemark());
break;
default:
return null;
OperationResult result = new OperationResult();
if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback())
// 开始提交事务消息
// 这里就是根据之前提交的内部事务topic的半消息偏移,查出来提交的这条消息
result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);
if (result.getResponseCode() == ResponseCode.SUCCESS)
// result.getPrepareMessage()就是之前提交到内部的事务topic上的那条半消息,检查下这条信息是否正确,日志偏移呀什么的是否匹配,是不是查错消息了
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS)
// 一切都OK了,准备提交事务,这里就是把原始消息信息,原原本本的恢复过来
MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
// 清除事务消息属性
MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED);
// 原始消息写入对应的topic,此时对消费端就可见了,可以正常消费了
RemotingCommand sendResult = sendFinalMessage(msgInner);
if (sendResult.getCode() == ResponseCode.SUCCESS)
// 删除的动作是在op队列(RMQ_SYS_TRANS_OP_HALF_TOPIC)写入该消息,tag是d,消息体是在事务topic里的消息偏移
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
return sendResult;
return res;
else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback())
// 事务回滚
result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);
if (result.getResponseCode() == ResponseCode.SUCCESS)
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS)
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
return res;
/**
* 总结一下:
* commit:就是先把原始消息写入到原始topic里,然后删除半消息就是在op 的事务topic里写入一条tag为d的消息,消息体就是半消息的偏移值
* rollback: 就是直接删除过程了
* unknown: 就是上面 两步都没做,原始消息未写入,op队列里也没有
*/
// UNKNOW状态了,看来得回查了,其实这里返不返回都一样,客户端是one way调用
response.setCode(result.getResponseCode());
response.setRemark(result.getResponseRemark());
return response;
事务回查
前面的流程呢,都是事务的正常处理,但是如果客户端在发送事务请求的时候,宕机、重启、网络原因等,最终是导致事务结束的请求没有正确发送给broker处理,那就需要事务回查机制。
broker启动的时候,会启动一个定时任务(默认是1分钟),从前面提到的事务topic的队列里拉取消息,检查拉取到的消息是否已经处理过了(比如提交或回滚),如果没有,根据是否要进行事务回查,让客户再检查一下本地事务的执行状态并告诉broker或者丢弃。
其实这里涉及到几个关键问题需要明白:
- 写入到事务topic里的事务半消息在事务结束后进行删除,但是rocketmq是追加写的方式,所以这里的删除并不是从消息队列里真正的删除一条消息。
- broker怎么知道一条事务半消息是否已经提交或者回滚了,正如前面说的,这里引入一个op队列,即另一个内部topic,如果一条消息已经提交或回滚了,就向op队列里写入一条消息消息体就是在事务topic队列里的偏移值,如果op队列里没有,那就说明这条事务消息的状态还没有提交,还是未知的,可能需要事务回查。
- 我们知道写入到事务topic的事务半消息也如普通消息一样,是顺序写顺序读的,如果此时已经写入1、2、3、4、5、6共6条事务消息了,1、2、5的事务状态已经提交或者回滚了,但是3、4还是未知的,那总不能再重新回头消费吧。并没有,如果broker发现这条消息是未知状态的,那在处理的时候,把这条消息再追回写入到事务topic的队列里,然后找客户端回查。继续下一条消息处理,等到再处理到刚才重新追加的这条事务消息的时候,再从op队列里检查一下,这条事务半消息是否已经处理过了,如果还没有而且也没达到事务回查的最大次数,那就再追回写回去,再继续呗。如果已经达到最大次数,就丢弃(其实是写到另一个内部topic,也就是说事务消息这里用到了3个内部topic来存储数据)
关于事务回查这里主要采用文字描述说明了,就不再画一个流程图了,关键源码如下:
broker默认每分钟检查一次,从内部事务topic队列和op队列里拉取消息,然后比对,当前的事务半消息是否已经处理过了,是否需要回查:
// 定时检查事务消息(1分钟查一次)
@Override
public void check(long transactionTimeout, int transactionCheckMax,
AbstractTransactionalMessageCheckListener listener)
try
String topic = TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC;
Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);
if (msgQueues == null || msgQueues.size() == 0)
log.warn("The queue of topic is empty :" + topic);
return;
log.debug("Check topic=, queues=", topic, msgQueues);
for (MessageQueue messageQueue : msgQueues)
long startTime = System.currentTimeMillis();
// 一条预消息队列对应一个op队列(实际也就1条队列)
MessageQueue opQueue = getOpQueue(messageQueue);
// 获取事务topic和op topic的消费偏移
long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue);
long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue);
log.info("Before check, the queue= msgOffset= opOffset=", messageQueue, halfOffset, opOffset);
if (halfOffset < 0 || opOffset < 0)
log.error("MessageQueue: illegal offset read: , op offset: ,skip this queue", messageQueue,
halfOffset, opOffset);
continue;
List<Long> doneOpOffset = new ArrayList<>();
HashMap<Long, Long> removeMap = new HashMap<>();
PullResult pullResult = fillOpRemoveMap(removeMap, opQueue, opOffset, halfOffset, doneOpOffset);
if (null == pullResult)
log.error("The queue= check msgOffset= with opOffset= failed, pullResult is null",
messageQueue, halfOffset, opOffset);
continue;
// single thread
int getMessageNullCount = 1;
long newOffset = halfOffset;
long i = halfOffset;
while (true)
if (System.currentTimeMillis() - startTime > MAX_PROCESS_TIME_LIMIT)
log.info("Queue= process time reach max=", messageQueue, MAX_PROCESS_TIME_LIMIT);
break;
// 已经处理过的,没必要再处理一次
if (removeMap.containsKey(i))
log.debug("Half offset has been committed/rolled back", i);
Long removedOpOffset = removeMap.remove(i);
// op的队列偏移
doneOpOffset.add(removedOpOffset);
else
// 获取当前要处理的half消息
GetResult getResult = getHalfMsg(messageQueue, i);
MessageExt msgExt = getResult.getMsg();
if (msgExt == null)
if (getMessageNullCount++ > MAX_RETRY_COUNT_WHEN_HALF_NULL)
break;
if (getResult.getPullResult().getPullStatus() == PullStatus.NO_NEW_MSG)
log.debug("No new msg, the miss offset= in=, continue check=, pull result=", i,
messageQueue, getMessageNullCount, getResult.getPullResult());
break;
else
log.info("Illegal offset, the miss offset= in=, continue check=, pull result=",
i, messageQueue, getMessageNullCount, getResult.getPullResult());
i = getResult.getPullResult().getNextBeginOffset();
newOffset = i;
continue;
// 超过15次丢弃,或者消息过期了(超过了设置的文件保存时间)
if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt))
// 默认实现是移动到TRANS_CHECK_MAXTIME_TOPIC这个topic里
listener.resolveDiscardMsg(msgExt);
newOffset = i + 1;
i++;
continue;
if (msgExt.getStoreTimestamp() >= startTime)
log.debug("Fresh stored. the miss offset=, check it later, store=", i,
new Date(msgExt.getStoreTimestamp()));
break;
long valueOfCurrentMinusBorn = System.currentTimeMillis() - msgExt.getBornTimestamp();
long checkImmunityTime = transactionTimeout;
// 未找到写入这个属性的地方(除了test)
String checkImmunityTimeStr = msgExt.getUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS);
if (null != checkImmunityTimeStr)
checkImmunityTime = getImmunityTime(checkImmunityTimeStr, transactionTimeout);
if (valueOfCurrentMinusBorn < checkImmunityTime)
// 超过这个检查时间,重新写回半消息队列
if (checkPrepareQueueOffset(removeMap, doneOpOffset, msgExt))
newOffset = i + 1;
i++;
continue;
else
// 新提交的半消息,暂不处理,估计是认为事务也可能没执行完,处理也没意义
if ((0 <= valueOfCurrentMinusBorn) && (valueOfCurrentMinusBorn < checkImmunityTime))
log.debug("New arrived, the miss offset=, check it later checkImmunity=, born=", i,
checkImmunityTime, new Date(msgExt.getBornTimestamp()));
break;
List<MessageExt> opMsg = pullResult.getMsgFoundList();
// checkImmunityTime默认是6秒,第一次可以检查的时间
// 正常来说,每条提交/回滚就是已经处理过的消息,在op队列里都有一条消息,如果没有(第一次回查),或者已经有了,但是存放时间已经满足检查条件了,都得回查
boolean isNeedCheck = (opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime)
|| (opMsg != null && (opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout))
|| (valueOfCurrentMinusBorn <= -1);
if (isNeedCheck)
// 把这个消息重新写回half队列里
if (!putBackHalfMsgQueue(msgExt, i))
continue;
// 事务回查,确认状态后,下次再处理上边这个写回的半消息
listener.resolveHalfMsg(msgExt);
else
pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset);
log.debug("The miss offset: in messageQueue: need to get more opMsg, result is:", i,
messageQueue, pullResult);
continue;
newOffset = i + 1;
i++;
if (newOffset != halfOffset)
transactionalMessageBridge.updateConsumeOffset(messageQueue, newOffset);
//已经连接处理的偏移,如果2,3,4,6,7,则最偏移到4.
long newOpOffset = calculateOpOffset(doneOpOffset, opOffset);
if (newOpOffset != opOffset)
transactionalMessageBridge.updateConsumeOffset(opQueue, newOpOffset);
catch (Throwable e)
log.error("Check error", e);
// 总之,一条一条拉取,如果在op队列,就是已经commit或者rollback的,不用再管了,否则就检查是否需要回查,需要的话,这条写再写回half队列
关于如何从op队列里确认事务半消息已经处理过了,主要就是根据op队列里拉取的消息的消息体(保存的是事务半消息的偏移值)来判断当前偏移的事务消息是否已经处理过了:
/**
* Read op message, parse op message, and fill removeMap
*
* @param removeMap Half message to be remove, key:halfOffset, value: opOffset.
* @param opQueue Op message queue.
* @param pullOffsetOfOp The begin offset of op message queue.
* @param miniOffset The current minimum offset of half message queue.
* @param doneOpOffset Stored op messages that have been processed.
* @return Op message result.
*/
private PullResult fillOpRemoveMap(HashMap<Long, Long> removeMap,
MessageQueue opQueue, long pullOffsetOfOp, long miniOffset, List<Long> doneOpOffset)
// 使用CID_RMQ_SYS_TRANS拉取op队列里的消息
PullResult pullResult = pullOpMsg(opQueue, pullOffsetOfOp, 32);
if (null == pullResult)
return null;
if (pullResult.getPullStatus() == PullStatus.OFFSET_ILLEGAL
|| pullResult.getPullStatus() == PullStatus.NO_MATCHED_MSG)
log.warn("The miss op offset= in queue= is illegal, pullResult=", pullOffsetOfOp, opQueue,
pullResult);
transactionalMessageBridge.updateConsumeOffset(opQueue, pullResult.getNextBeginOffset());
return pullResult;
else if (pullResult.getPullStatus() == PullStatus.NO_NEW_MSG)
log.warn("The miss op offset= in queue= is NO_NEW_MSG, pullResult=", pullOffsetOfOp, opQueue,
pullResult);
return pullResult;
List<MessageExt> opMsg = pullResult.getMsgFoundList();
if (opMsg == null)
log.warn("The miss op offset= in queue= is empty, pullResult=", pullOffsetOfOp, opQueue, pullResult);
return pullResult;
// 对拉取的消息做过滤处理,判断一下这些op消息对应的half消息是否处理过了
for (MessageExt opMessageExt : opMsg)
// 记录这条op消息对应在事务队列里的偏移值
Long queueOffset = getLong(new String(opMessageExt.getBody(), TransactionalMessageUtil.charset));
log.debug("Topic: tags: , OpOffset: , HalfOffset: ", opMessageExt.getTopic(),
opMessageExt.getTags(), opMessageExt.getQueueOffset(), queueOffset);
if (TransactionalMessageUtil.REMOVETAG.equals(opMessageExt.getTags()))
// 找到的都是需要"删除"的半消息
// miniOffset就是halfOffset,将要消费的最小偏移 ,这是处理完成待删除的op消息
if (queueOffset < miniOffset)
doneOpOffset.add(opMessageExt.getQueueOffset());
else
// op消息保存的是half消息的偏移,这个值竟然大于当前half消息的偏移,这是已经处理过的,不需要再处理了
removeMap.put(queueOffset, opMessageExt.getQueueOffset());
else
log.error("Found a illegal tag in opMessageExt= ", opMessageExt);
log.debug("Remove map: ", removeMap);
log.debug("Done op list: ", doneOpOffset);
return pullResult;
broker在该生产组下找到一个生产者客户端发送回查请求:
public void sendCheckMessage(MessageExt msgExt) throws Exception
CheckTransactionStateRequestHeader checkTransactionStateRequestHeader = new CheckTransactionStateRequestHeader();
checkTransactionStateRequestHeader.setCommitLogOffset(msgExt.getCommitLogOffset());
checkTransactionStateRequestHeader.setOffsetMsgId(msgExt.getMsgId());
checkTransactionStateRequestHeader.setMsgId(msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
checkTransactionStateRequestHeader.setTransactionId(checkTransactionStateRequestHeader.getMsgId());
checkTransactionStateRequestHeader.setTranStateTableOffset(msgExt.getQueueOffset());
msgExt.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));
msgExt.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));
msgExt.setStoreSize(0);
String groupId = msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
// 根据生产组找到对应的生产者实例,发送一个回查请求
Channel channel = brokerController.getProducerManager().getAvailableChannel(groupId);
if (channel != null)
brokerController.getBroker2Client().checkProducerTransactionState(groupId, channel, checkTransactionStateRequestHeader, msgExt);
else
LOGGER.warn("Check transaction failed, channel is null. groupId=", groupId);
客户端收到请求后,执行事务回查逻辑,并将事务状态发回broker:
// 事务反查
public RemotingCommand checkTransactionState(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException
final CheckTransactionStateRequestHeader requestHeader =
(CheckTransactionStateRequestHeader) request.decodeCommandCustomHeader(CheckTransactionStateRequestHeader.class);
final ByteBuffer byteBuffer = ByteBuffer.wrap(request.getBody());
final MessageExt messageExt = MessageDecoder.decode(byteBuffer);
if (messageExt != null)
if (StringUtils.isNotEmpty(this.mqClientFactory.getClientConfig().getNamespace()))
messageExt.setTopic(NamespaceUtil
.withoutNamespace(messageExt.getTopic(), this.mqClientFactory.getClientConfig().getNamespace()));
String transactionId = messageExt.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (null != transactionId && !"".equals(transactionId))
// 就是客户端 msg Id,如果用户没有自定义设置这个值
messageExt.setTransactionId(transactionId);
final String group = messageExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
if (group != null)
MQProducerInner producer = this.mqClientFactory.selectProducer(group);
if (producer != null)
final String addr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
producer.checkTransactionState(addr, messageExt, requestHeader);
else
log.debug("checkTransactionState, pick producer by group[] failed", group);
else
log.warn("checkTransactionState, pick producer group failed");
else
log.warn("checkTransactionState, decode message failed");
return null;
结语
关于rocketmq事务消息篇到此结束,后续如果有需要会再进行补充。
rocketmq源码分析之rocketmq事务消息实现原理中篇----事务消息状态回查(代码片段)
上节已经梳理了RocketMQ发送事务消息的流程(基于二阶段提交),本节将继续深入学习事务状态消息回查,我们知道,第一次提交到消息服务器时消息的主题被替换为RMQ_SYS_TRANS_HALF_TOPIC,本地事务执行完后如果返回本地事务状态... 查看详情
rocketmq源码分析之rocketmq事务消息实现原理中篇----事务消息状态回查(代码片段)
上节已经梳理了RocketMQ发送事务消息的流程(基于二阶段提交),本节将继续深入学习事务状态消息回查,我们知道,第一次提交到消息服务器时消息的主题被替换为RMQ_SYS_TRANS_HALF_TOPIC,本地事务执行完后如果返回本地事务状态... 查看详情
rocketmq源码分析之从官方示例窥探:rocketmq事务消息实现基本思想(代码片段)
RocketMQ4.3.0版本开始支持事务消息,后续分享将开始将剖析事务消息的实现原理。首先从官方给出的Demo实例入手,以此通往RocketMQ事务消息的世界中。官方版本未发布之前,从apacherocketmq第一个版本上线后,代码中存在与事务消息... 查看详情
rocketmq事务消息实战(代码片段)
RocketMQ事务消息阅读目录指引:RocketMQ源码分析之从官方示例窥探RocketMQ事务消息实现基本思想RocketMQ源码分析之RocketMQ事务消息实现原理上篇RocketMQ源码分析之RocketMQ事务消息实现原理中篇----事务消息状态回查RocketMQ源码分析... 查看详情
rocketmq源码分析4-事务消息实现原理
为什么消息要具备事务能力参见还是比较清晰的。简单的说就是在你业务逻辑过程中,需要发送一条消息给订阅消息的人,但是期望是此逻辑过程完全成功完成之后才能使订阅者收到消息。业务逻辑过程假设是这样的:逻辑部分... 查看详情
rocketmq源码分析之rocketmq事务消息实现原下篇(事务提交或回滚)
本文将重点分析RocketMQBroker如何处理事务消息提交、回滚命令,根据前面的介绍,其入口EndTransactionProcessor#proce***equest:OperationResultresult=newOperationResult();if(MessageSysFlag.TRANSACTION_COMMIT_TYPE==requestHeader.getCommitOrRollback())//@1result=this.broke... 查看详情
消息中间件rocketmq源码解析:事务消息
650)this.width=650;"src="http://www.yunai.me/images/common/wechat_mp.jpeg"style="border:2pxsolidrgb(238,238,238);margin-top:0px;"/>关注微信公众号:【芋艿的后端小屋】有福利:RocketMQ/MyCAT/Sharding-JDBC 所有源码分析文章列表Rock 查看详情
rocketmq事务消息详解(代码片段)
...#x1f34a;Java学习:Java从入门到精通总结🍊深入浅出RocketMQ设计思想:深入浅出RocketMQ设计思想🍊绝对不一样的职场干货:大厂最佳实践经验指南📆最近更新:2023年4月9日🍊个人简介:通信工程本... 查看详情
rocketmq源码合集
消息队列中间件RocketMQ源码分析——Message发送与接收消息队列中间件RocketMQ源码分析——Message存储分布式消息队列RocketMQ源码分析——Message拉取与消费(上)分布式消息队列RocketMQ源码分析——Message拉取与消费(下... 查看详情
30rocketmq事务消息的代码实现细节(代码片段)
...。1.发送half事务消息出去packagecom.mqTrsMessage;importorg.apache.rocketmq.client.producer.DefaultMQProducer;importorg.apache.rocketmq.clien 查看详情
rocketmq事务消息入门介绍(代码片段)
说明周五的时候发了篇:Rocketmq4.3支持事务啦!!!,趁着周末的时候把相关内容看了下,下面的主要内容就是关于RocketMQ事务相关内容介绍了。说明:今天这篇仅仅是入门介绍,并没有涉及到很多细节,先把大概流程说明白,... 查看详情
rocketmq(09)——发送事务消息(代码片段)
发送事务消息RocketMQ支持发送事务消息,它的事务消息是基于二阶段提交机制实现的。当发送的消息是事务消息时,只有对应的消息被提交了才能被消费者进行消费。发送事务消息时生产者需要使用TransactionMQProducer,它还需要指... 查看详情
rocketmq源码之事务消息的回调方法应该怎么写?
两个回调方法:发送消息成功之后执行事务的executeLocalTransaction,回查时候调用的checkLocalTransaction。思路:执行事务的时候,调用service的方法,这个方法需要用事务注解,方法参数中传入唯一id,事务方法最后判断如果id不为空... 查看详情
源码分析rocketmq系列索引
1、RocketMQ源码分析之NameServer2、RocketMQ源码分析之Broker概述与同步消息发送原理与高可用设计及思考3、源码分析RocketMQ之CommitLog消息存储机制4、源码分析RocketMQ之消息消费5、源码分析RocketMQ消息消费机制----消费者拉取消息机制6、... 查看详情
rocketmq(09)——发送事务消息(代码片段)
发送事务消息RocketMQ支持发送事务消息,它的事务消息是基于二阶段提交机制实现的。当发送的消息是事务消息时,只有对应的消息被提交了才能被消费者进行消费。发送事务消息时生产者需要使用TransactionMQProducer,... 查看详情
rocketmq事务消息原理(代码片段)
一、RocketMQ事务消息原理: RocketMQ在4.3版本之后实现了完整的事务消息,基于MQ的分布式事务方案,本质上是对本地消息表的一个封装,整体流程与本地消息表一致,唯一不同的就是将本地消息表存... 查看详情
27发送消息零丢失方案:rocketmq事务消息的实现流程分析
...常见的网络故障之类的问题,导致消息就丢失了。在RocketMQ中,有这么一个功能,就是事务消息的功能,凭借这个事务级的消息机制,就可以让我们确保订单系统推送出去的消息一定会成功写入MQ里 查看详情
rocketmq实现事务消息方案(代码片段)
RocketMQ是一个来自阿里巴巴的分布式消息中间件,于2012年开源,并在2017年正式成为Apache顶级项目。据了解,包括阿里云上的消息产品以及收购的子公司在内,阿里集团的消息产品全线都运行在RocketMQ之上,并且最近几年的双十一... 查看详情