精华推荐|深入浅出rocketmq原理及实战「底层源码挖掘系列」透彻剖析贯穿rocketmq的消费者端的运行核心的流程(上篇)

author author     2023-01-26     288

关键词:

精华推荐 | 【深入浅出 RocketMQ原理及实战】「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行核心的流程

  • 上篇:分析对应总体消费流程的判断和校验以及限流控制和回调等处理流程分析
  • 下篇:分析基于上篇的总体流程的底层的消息通讯以及拉去处理数据传输流程分析

RocketMQ的消息模型

RocketMQ的基础消息模型是发布-订阅(Pub/Sub)是一种消息范式,消息的发送者(称为发布者、生产者、Producer)会将消息直接发送给特定的接收者(称为订阅者、消费者、Comsumer),如下图所示。

精华推荐

消息通过生产者发送到某一个Topic,如果需要订阅该Topic并消费里面的消息的话,就要创建对应的消费者进行消费,而本文主要会进行介绍对应的消息队列的消费者。

本文主旨

本文主要会针对于RocketMQ的消费者Consumer的功能原理进行分析和介绍,消费者主要会通过以推(push),拉(pull)两种模式对消息进行消费。同时也支持集群方式和广播方式的消费。提供实时消息订阅机制,可以满足大多数用户的需求。

RocketMQ提供Push模式也提供了Pull模式

MQ的消费模式可以大致分为两种,一种是推Push,一种是拉Pull。

Push模式处理消费消费

Push是服务端主动推送消息给客户端,优点是及时性较好,但如果客户端没有做好流控,一旦服务端推送大量消息到客户端时,就会导致客户端消息堆积甚至崩溃。

DefaultMQPushConsumer的使用和初始化

Push模式主要通过初始化DefaultMQPushConsumer对象进行消费数据信息,案例代码如下所示。

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");
// 设置NameServer地址
consumer.setNamesrvAddr("localhost:9876");
//订阅一个或多个topic,并指定tag过滤条件,这里指定*表示接收所有tag的消息
consumer.subscribe("TopicTest", "*");
//注册回调接口来处理从Broker中收到的消息
consumer.registerMessageListener(new MessageListenerConcurrently()
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context)
// 返回消息消费状态,ConsumeConcurrentlyStatus.CONSUME_SUCCESS为消费成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

);
// 启动Consumer
consumer.start();
消费的位点配置

消费端的消费的位点计算值,可以在启动前进行配置,主要方法可以通过下面代码进行配置。

consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
  • CONSUME_FROM_LAST_OFFSET:第一次启动从队列最后位置消费,后续再启动接着上次消费的进度开始消费
  • CONSUME_FROM_FIRST_OFFSET:第一次启动从队列初始位置消费,后续再启动接着上次消费的进度开始消费
  • CONSUME_FROM_TIMESTAMP:第一次启动从指定时间点位置消费,后续再启动接着上次消费的进度开始消费。

注意:第一次启动是指从来没有消费过消息的消费者,如果该消费者消费过,那么会在broker端记录该消费者的消费位置,如果该消费者挂了再启动,那么自动从上次消费的进度开始

消费的模式的配置

消费模式主要分为:集群消费(Clustering)和广播消费(Broadcasting)这两种。

  • 集群消费模式下,相同Consumer Group的每个Consumer实例平均分摊消息。
  • 广播消费模式下,相同Consumer Group的每个Consumer实例都接收全量的消息。
consumer.setMessageModel(MessageModel.BROADCASTING);
  • CLUSTERING:默认模式,同一个ConsumerGroup,每个consumer只消费所订阅消息的一部分内容,同一个ConsumerGroup里所有的Consumer消息加起来才是所订阅topic整体,从而达到负载均衡的目的。
  • BROADCASTING:同一个ConsumerGroup每个consumer都消费到所订阅topic所有消息,也就是一个消费会被多次分发,被多个consumer消费。
DefaultMQPushConsumer的运行原理和流程

DefaultMQPushConsumerImpl中各个对象的主要功能如下:

平衡和分配队列组件实现类-RebalancePushImpl

RebalancePushImpl:主要负责进行分配对应当前服务实例的消费者会从当前消费的topic中的那个Queue中进行消费消息;此外当消费者宕机或者下线的时候,还会执行rebalance再次平衡和分配给其他消费者对应的队列控制。

长连接进行拉去消息组件实现类-PullAPIWrapper

PullAPIWrapper:主要与broker服务端建立长连接,一直进行定时从broker服务端处拉取消息数据,默认为:32条消息,之后还会调用ConsumeMessageService实现类,进行用户注册的Listener执行消息消费逻辑。

精华推荐

看一下consumer.registerMessageListener的源码,如下所示。

/**
* Register a callback to execute on message arrival for concurrent consuming.
* @param messageListener message handling callback.
*/
@Override
public void registerMessageListener(MessageListenerConcurrently messageListener)
this.messageListener = messageListener;
this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
回调用户的注册的MessageListener组件实现类-ConsumeMessageService

ConsumeMessageService:实现所谓的"Push-被动"消费机制;从Broker拉取的消息后,封装成ConsumeRequest提交给ConsumeMessageSerivce,此service负责回调用户的Listener消费消息。

精华推荐

存储Offset的消费记录的位移组件实现类--OffsetStore

OffsetStore:维护当前consumer的消费记录(offset);有两种实现,Local和Rmote,Local存储在本地磁盘上,适用于BROADCASTING广播消费模式;而Remote则将消费进度存储在Broker上,适用于CLUSTERING集群消费模式;

精华推荐

综合门面功能接口供各个Service组件实现类--MQClientFactory

MQClientFactory:负责管理client(consumer、producer),并提供多中功能接口供各个Service(Rebalance、PullMessage等)调用;大部分逻辑均在这个类中完成,总体流程架构如下图所示。

精华推荐

DefaultMQPushConsumerImpl的start方法的源码

我们先来看一下对应的DefaultMQPushConsumerImpl类的start方法源码,源码可以看出主要实现过程在consumer.start后调用DefaultMQPushConsumerImpl的同步start方法,如下所示。

public synchronized void start() throws MQClientException 
switch (this.serviceState)
case CREATE_JUST:
log.info("the consumer [] start beginning. messageModel=, isUnitMode=", this.defaultMQPushConsumer.getConsumerGroup(),
this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
this.serviceState = ServiceState.START_FAILED;
this.checkConfig();
this.copySubscription();
if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING)
this.defaultMQPushConsumer.changeInstanceNameToPID();

this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
this.pullAPIWrapper = new PullAPIWrapper(
mQClientFactory,
this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
if (this.defaultMQPushConsumer.getOffsetStore() != null)
this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
else
switch (this.defaultMQPushConsumer.getMessageModel())
case BROADCASTING:
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
case CLUSTERING:
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
default:
break;

this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);

this.offsetStore.load();
if (this.getMessageListenerInner() instanceof MessageListenerOrderly)
this.consumeOrderly = true;
this.consumeMessageService =
new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently)
this.consumeOrderly = false;
this.consumeMessageService =
new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());

this.consumeMessageService.start();
boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
if (!registerOK)
this.serviceState = ServiceState.CREATE_JUST;
this.consumeMessageService.shutdown();
throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);

mQClientFactory.start();
log.info("the consumer [] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;

this.updateTopicSubscribeInfoWhenSubscriptionChanged();
this.mQClientFactory.checkClientInBroker();
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
this.mQClientFactory.rebalanceImmediately();
DefaultMQPushConsumerImpl的start方法的源码

主要我们能关注重点的代码和组件的start,通过mQClientFactory.start();发我们发现他调用了很多组件的start方法:

  • this.mQClientAPIImpl.start():主要用于开启请求-响应的网络通道对象。
  • this.startScheduledTask():主要开启多个定时任务的功能
  • this.pullMessageService.start():主要开启拉取数据的业务组件
  • this.rebalanceService.start():主要开启rebalance业务服务组件。
  • this.defaultMQProducer.getDefaultMQProducerImpl().start(false):开启push服务的对象组件作为门面。
public void start() throws MQClientException 
synchronized (this)
switch (this.serviceState)
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// If not specified,looking address from name server
if (null == this.clientConfig.getNamesrvAddr())
this.mQClientAPIImpl.fetchNameServerAddr();

// Start request-response channel
this.mQClientAPIImpl.start();
// Start various schedule tasks
this.startScheduledTask();
// Start pull service
this.pullMessageService.start();
// Start rebalance service
this.rebalanceService.start();
// Start push service
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
break;
case SHUTDOWN_ALREADY:
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;


重点我们先来主要看pullMessageService.start(),通过这里我们发现RocketMQ的Push模式底层其实也是通过pull实现的,接下来我们先来分析一下pullMessageService中的pullMessage方法的源码。

精华推荐

private void pullMessage(final PullRequest pullRequest) 
final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
if (consumer != null)
DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
impl.pullMessage(pullRequest);
else
log.warn("No matched consumer for the PullRequest , drop it", pullRequest);

DefaultMQPushConsumerImpl的pullMessage方法的源码

源码中需要进行根据消费组进行筛选对应的消费组,以方便选对应的消费组件DefaultMQPushConsumerImpl,如下图所示。

final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());

最后还是通过DefaultMQPushConsumerImpl类的pullMessage方法来进行消息的逻辑处理,

DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
impl.pullMessage(pullRequest);
DefaultMQPushConsumerImpl的逻辑限流控制流程

总结一下针对于限流的总体流程控制:

精华推荐

  1. 首先拉去消息数据的时候会先去判断对应的ProcessQueue的对象元素是否还存在订阅关系或者被删除了,从而进行跳过那些不应该被消费的数据。
final ProcessQueue processQueue = pullRequest.getProcessQueue();
if (processQueue.isDropped())
log.info("the pull request[] is dropped.", pullRequest.toString());
return;

上面的逻辑是先会判断和校验PullRequest对象中的ProcessQueue对象的dropped是否为true(在RebalanceService线程中为topic下的MessageQueue创建拉取消息请求时要维护对应的ProcessQueue对象,若Consumer不再订阅该topic则会将该对象的dropped置为true);若是则认为该请求是已经取消的,则直接跳出该方法。


  1. 更新PullRequest对象中的ProcessQueue对象的时间戳(ProcessQueue.lastPullTimestamp)为当前时间戳。此外会判断当前的Consumer消费者组件是否运行中,主要是通过DefaultMQPushConsumerImpl.serviceState是否为RUNNING。
pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());
try
this.makeSureStateOK();
catch (MQClientException e)
log.warn("pullMessage exception, consumer state not ok", e);
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
return;

if (this.isPause())
log.warn("consumer was paused, execute pull request later. instanceName=, group=", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);
return;

如果运行状态或者是暂停状态this.isPause()=false(DefaultMQPushConsumerImpl.pause=true),则会进行执行​​PullMessageService.executePullRequestLater(PullRequest pullRequest, long timeDelay)​​方法延迟再拉取消息,其中timeDelay=3000;

该方法的目的是在3秒之后再次将该PullRequest对象放入PullMessageService. pullRequestQueue队列中;并跳出该方法


  1. 主要进行消费者端进行速度和控制消费速度的流控。若ProcessQueue对象的msgCount大于了消费端的流控阈值,默认值为1000,主要通过​​DefaultMQPushConsumer.pullThresholdForQueue​​的执行进行判断。当调用的​​processQueue.getMsgCount().get()​​的数值大于​​DefaultMQPushConsumer.pullThresholdForQueue​​ 的值时候会进行 PullMessageService.executePullRequestLater方法。
long cachedMessageCount = processQueue.getMsgCount().get();
long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue())
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueFlowControlTimes++ % 1000) == 0)
log.warn(
"the cached message count exceeds the threshold , so do flow control, minOffset=, maxOffset=, count=, size= MiB, pullRequest=, flowControlTimes=",
this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);

return;
  1. 主要进行消费者端进行速度和控制消费速度的流控。主要会通过​​ this.defaultMQPushConsumer.getPullThresholdSizeForQueue()​​与进行计算消息的内存空间的总大小进行对比,单位是M,当大于系统定义的​​ this.defaultMQPushConsumer.getPullThresholdSizeForQueue()​​的阈值大小的时候,则会进行限流处理。
if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) 
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueFlowControlTimes++ % 1000) == 0)
log.warn(
"the cached message size exceeds the threshold MiB, so do flow control, minOffset=, maxOffset=, count=, size= MiB, pullRequest=, flowControlTimes=",
this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);

return;

以上3和4步骤中的(DELAY_MILLS_WHEN_FLOW_CONTROL)50毫秒之后,才会将该PullRequest请求放入PullMessageService.pullRequestQueue队列中。从而实现看限流的能力。


  1. 当上面的直接限流之后,还会有跨度限流的控制,首先系统还会判断当前的消费方式是否顺序消费(即DefaultMQPushConsumerImpl.consumeOrderly等于false)。
if (!this.consumeOrderly) 
if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan())
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueMaxSpanFlowControlTimes++ % 1000) == 0)
log.warn(
"the queues messages, span too long, so do flow control, minOffset=, maxOffset=, maxSpan=, pullRequest=, flowControlTimes=",
processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
pullRequest, queueMaxSpanFlowControlTimes);

return;

则检查ProcessQueue对象的​​msgTreeMap:TreeMap<Long,MessageExt>​​ 变量的第一个key值与最后一个key值之间的的差值,该key值表示查询的队列偏移量queueoffset;

若queueoffset差值大于阈值(DefaultMQPushConsumer. consumeConcurrentlyMaxSpan指定,默认是2000),则调用PullMessageService.executePullRequestLater方法,在50毫秒之后再该PullRequest请求放入PullMessageService.pullRequestQueue队列中。


  1. PullRequest.messageQueue对象的topic值为参数RebalanceImpl.subscriptionInner,ConcurrentHashMap, SubscriptionData>中获取对应的SubscriptionData对象,若该对象为null,考虑到并发的关系,调用executePullRequestLater方法。
final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
if (null == subscriptionData)
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
log.warn("find the consumers subscription failed, ", pullRequest);
return;
DefaultMQPushConsumerImpl的消费模式分类

如果当前的消费者属于集群模式(RebalanceImpl.messageModel等于CLUSTERING)。

  • PullRequest对象的MessageQueue变量值,type =READ_FROM_MEMORY(从内存中获取消费进度offset值)为参数调用DefaultMQPushConsumerImpl的offsetStore对象。
  • 实际代表着RemoteBrokerOffsetStore对象的readOffset(MessageQueue mq, ReadOffsetType type)方法从本地内存中获取消费进度offset值。
boolean commitOffsetEnable = false;
long commitOffsetValue = 0L;
if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel())
commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
if (commitOffsetValue > 0)
commitOffsetEnable = true;

若该offset值大于0,则置临时变量commitOffsetEnable等于true否则为false;该offset值作为pullKernelImpl方法中的commitOffset参数,在Broker端拉取消息之后根据commitOffsetEnable参数值决定是否用该offset更新消息进度。

该readOffset方法的逻辑是:以入参MessageQueue对象从RemoteBrokerOffsetStore.offsetTable:ConcurrentHashMap <MessageQueue,AtomicLong>变量中获取消费进度偏移量;若该偏移量不为null则返回该值,否则返回-1;


DefaultMQPushConsumerImpl的订阅模型

当每次拉取消息之后需要更新订阅关系(由DefaultMQPushConsumer. postSubscriptionWhenPull参数表示,默认为false)并且以topic值参数从RebalanceImpl.subscriptionInner获取的SubscriptionData对象的classFilterMode等于false(默认为false),则将sysFlag标记的第3个字节置为1,否则该字节置为0;

SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
if (sd != null)
if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode())
subExpression = sd.getSubString();

classFilter = sd.isClassFilterMode();

int sysFlag = PullSysFlag.buildSysFlag(
commitOffsetEnable, // commitOffset
true, // suspend
subExpression != null, // subscription
classFilter // class filter
);

该sysFlag标记的第1个字节置为commitOffsetEnable的值;第2个字节(suspend标记)置为1;第4个字节置为classFilterMode的值。

DefaultMQPushConsumerImpl的底层客户端如何拉取消息的通信方法

调用底层的拉取消息API接口,方法进行消息拉取操作。

PullAPIWrapper.pullKernelImpl(MessageQueue mq, String subExpression, long subVersion,long offset, int maxNums, int sysFlag,long commitOffset,long brokerSuspendMaxTimeMillis, long timeoutMillis, CommunicationMode communicationMode, PullCallback pullCallback)

内部会存在将回调类PullCallback传入该方法中,当采用异步方式拉取消息时,在收到响应之后会回调该回调类的方法。

try 
this.pullAPIWrapper.pullKernelImpl(
pullRequest.getMessageQueue(),
subExpression,
subscriptionData.getExpressionType(),
subscriptionData.getSubVersion(),
pullRequest.getNextOffset(),
this.defaultMQPushConsumer.getPullBatchSize(),
sysFlag,
commitOffsetValue,
BROKER_SUSPEND_MAX_TIME_MILLIS,
CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
// 消息的通信方式为异步
CommunicationMode.ASYNC,
pullCallback
);
catch (Exception e)
log.error("pullKernelImpl exception", e);
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);

初始化匿名内部类PullCallback,实现了onSuccess/onException方法; 该方法只有在异步请求的情况下才会回调;

PullCallback pullCallback = new PullCallback() 
@Override
public void onSuccess(PullResult pullResult)
if (pullResult != null)
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
subscriptionData);
switch (pullResult.getPullStatus())
case FOUND:
long prevRequestOffset = pullRequest.getNextOffset();
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
long pullRT = System.currentTimeMillis() - beginTimestamp;
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullRT);
long firstMsgOffset = Long.MAX_VALUE;
if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty())
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
else
firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0)
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
else
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);


if (pullResult.getNextBeginOffset() < prevRequestOffset
|| firstMsgOffset < prevRequestOffset)
log.warn(
"[BUG] pull message result maybe data wrong, nextBeginOffset: firstMsgOffset: prevRequestOffset: ",
pullResult.getNextBeginOffset(),
firstMsgOffset,
prevRequestOffset);

break;
case NO_NEW_MSG:
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
break;
case NO_MATCHED_MSG:
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
break;
case OFFSET_ILLEGAL:
log.warn("the pull request offset illegal, ",
pullRequest.toString(), pullResult.toString());
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
pullRequest.getProcessQueue().setDropped(true);
DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable()
@Override
public void run()
try
DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
pullRequest.getNextOffset(), false);
DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());
DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
log.warn("fix the pull request offset, ", pullRequest);
catch (Throwable e)
log.error("executeTaskLater Exception", e);


, 10000);
break;
default:
break;



@Override
public void onException(Throwable e)
if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX))
log.warn("execute the pull request exception", e);

DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);

;

主要针对于拉去底层消息的状态进行分状态处理,针对于this.pullAPIWrapper.pullKernelImpl的方法,我会在【下篇】进行介绍,此处不做讲述分析。

此外针对于PullStatus状态的分析-FOUND状态的处理,主要更新本地的offset值,以及流程控制等。如下所示。

long prevRequestOffset = pullRequest.getNextOffset();
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
long pullRT = System.currentTimeMillis() - beginTimestamp;
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullRT);
long firstMsgOffset = Long.MAX_VALUE;
if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty())
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
else
firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset(); DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0)
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
else
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);


if (pullResult.getNextBeginOffset() < prevRequestOffset
|| firstMsgOffset < prevRequestOffset)
log.warn(
"[BUG] pull message result maybe data wrong, nextBeginOffset: firstMsgOffset: prevRequestOffset: ",
pullResult.getNextBeginOffset(),
firstMsgOffset,
prevRequestOffset);

break;
  • 此外针对于PullStatus状态的分析-NO_NEW_MSG状态的处理,主要进行更新topic标签的offset的数据值、以及下一次拉去消息的offset值。
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
  • 此外针对于PullStatus状态的分析-NO_MATCHED_MSG:状态的处理,主要进行更新topic标签的offset的数据值、以及下一次拉去消息的offset值。
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
  • 此外针对于PullStatus状态的分析-OFFSET_ILLEGAL:状态的处理,重新同步远程的offset数据值。
pullRequest.getProcessQueue().setDropped(true);
DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable()
@Override
public void run()
try
DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
pullRequest.getNextOffset(), false);
DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue()); DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
log.warn("fix the pull request offset, ", pullRequest);
catch (Throwable e)
log.error("executeTaskLater Exception", e);


, 10000);

精华推荐|深入浅出rocketmq原理及实战「底层原理挖掘系列」透彻剖析贯穿rocketmq的存储系统的实现原理和持久化机制

RocketMQ的发展历史RocketMQ是一个统一消息引擎、轻量级数据处理平台。RocketMQ是一款阿里巴巴开源的消息中间件。2016年11月28日,阿里巴巴向广西党性培训Apache软件基金会捐赠RocketMQ,成为Apache孵化项目。2017年9月25日,A... 查看详情

精华推荐|深入浅出rocketmq原理及实战「性能原理挖掘系列」透彻剖析贯穿rocketmq的系统服务底层原理以及高性能存储设计挖掘深入

设计背景消息中间件的本身定义来考虑,应该尽量减少对于外部第三方中间件的依赖。一般来说依赖的外部系统越多,也会使得本身的设计越复杂,采用文件系统作为消息存储的方式。RocketMQ存储机制消息中间件的存储一般都是... 查看详情

精华推荐|深入浅出rocketmq原理及实战「底层原理挖掘系列」透彻剖析贯穿rocketmq的broker服务端自动创建topic的原理分析和问题要点指南

前提背景使用RocketMQ进行发消息时,一般我们是必须要指定topic,此外topic必须要提前建立,但是topic的创建(自动或者手动方式)的设置有一个开关autoCreateTopicEnable,此部分主要会在broker节点的配置文件的时候进行设置,运行环... 查看详情

精华推荐|深入浅出rocketmq原理及实战「性能原理挖掘系列」透彻剖析贯穿rocketmq的系统服务底层原理以及高性能存储设计挖掘深入(代码片段)

设计背景消息中间件的本身定义来考虑,应该尽量减少对于外部第三方中间件的依赖。一般来说依赖的外部系统越多,也会使得本身的设计越复杂,采用文件系统作为消息存储的方式。RocketMQ存储机制消息中间件的存... 查看详情

精华推荐|深入浅出rocketmq原理及实战「底层原理挖掘系列」透彻剖析贯穿rocketmq的broker服务端自动创建topic的原理分析和问题要点指南(代码片段)

前提背景使用RocketMQ进行发消息时,一般我们是必须要指定topic,此外topic必须要提前建立,但是topic的创建(自动或者手动方式)的设置有一个开关autoCreateTopicEnable,此部分主要会在broker节点的配置文件的... 查看详情

深入浅出rocketmq原理及实战「底层原理挖掘系列」透彻剖析贯穿rocketmq的消息发送的全部流程和落盘原理分析指南(代码片段)

前言介绍RocketMQ目前在国内应该是比较流行的MQ了,目前本人也在公司的项目中进行使用和研究,借着这个机会,分析一下RocketMQ发送一条消息到存储一条消息的过程,这样会对以后大家分析和研究RocketMQ相关的问... 查看详情

精华推荐|深入浅出sentinel原理及实战「原理探索专题」完整剖析alibaba微服务架构体系之轻量级高可用流量控制组件sentinel

Sentinel是什么?不要概念混淆啊!注意:本Sentinel与Redis服务Sentinel是两回事,压根不是一个概念,请大家不要混肴。Alibaba的SentinelSentinel是由阿里巴巴中间件团队开发的开源项目,是一种面向分布式微服务... 查看详情

深度挖掘rocketmq底层源码「底层源码挖掘系列」透彻剖析贯穿rocketmq的消费者端的运行核心的流程(上篇)(代码片段)

精华推荐|【深入浅出RocketMQ原理及实战】「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行核心的流程上篇:分析对应总体消费流程的判断和校验以及限流控制和回调等处理流程分析下篇:分析基于上篇的总体... 查看详情

深入浅出spring原理及实战「源码原理实战」从底层角度去分析研究propertysourcesplaceholderconfigurer的原理及实战注入机制

Spring提供配置解析功能主要有一下xml文件占位符解析和Java的属性@Value的占位符解析配置这两种场景进行分析和实现解析,如下面两种案例。xml文件的占位符解析配置<beanid="dataSource"class="com.alibaba.druid.pool.DruidDataSource"init-method="i... 查看详情

精华推荐|jvm深层系列「gc底层调优系列」一文带你彻底加强夯实底层原理之gc垃圾回收技术的分析指南(gc原理透析)

前提介绍很多小伙伴,都跟我反馈,说自己总是对JVM这一块的学习和认识不够扎实也不够成熟,因为JVM的一些特性以及运作机制总是混淆以及不确定,导致面试和工作实战中出现了很多的纰漏和短板,解决广大小伙伴痛点,我写... 查看详情

精华推荐|深入浅出学习透析nginx服务器的基本原理和配置指南「keepalive性能优化实战篇」(代码片段)

Linux系统:Centos7x64Nginx版本:1.11.5Nginx是一款面向性能设计的HTTP服务器,能反向代理HTTP,HTTPS和邮件相关(SMTP,POP3,IMAP)的协议链接。并且提供了负载均衡以及HTTP缓存。它的设计充分使用异步事件模型,... 查看详情

rocketmq架构原理精华分析

rocketMq架构原理精华分析是我们这篇文章的核心,从消息中间件的对比、架构模型、消息模型、常见问题等逐一分析:一、中间件对比:RabbitMq集群效果不太好,底层不是java语言,研究原理比较困难;Kafka... 查看详情

深入理解rocketmq---实战(控制台搭建)(代码片段)

 rocketMQ控制台搭建(1)下载rocketmq-console代码:https://github.com/875279177/incubator-rocketmq-externals(2)修改配置application配置文件,主要修改端口号及rocketmq.config.dataPathserver.contextPath=server.port=8080#spring.ap 查看详情

深入浅出springcloud原理及实战「netflix系列之hystrix」针对于限流熔断组件hystrix的基本参数和实现原理介绍分析(代码片段)

...此度过糟糕的一生。[温馨提示]承接第一篇文章🏹【深入浅出SpringCloud原理及实战】「Netflix系列之Hystrix」针对于限流熔断组件Hystrix的基本参数和实现原理介绍分析在这里推荐给大家martinfowler的熔断器介绍和权威指南,有兴趣... 查看详情

深入浅出springcloud原理及实战「netflix系列之hystrix」针对于限流熔断组件hystrix的超时机制的原理和实现分析(代码片段)

...此度过糟糕的一生。[温馨提示]承接第一篇文章🏹【深入浅出SpringCloud原理及实战】「Netflix系列之Hystrix」针对于限流熔断组件Hystrix的基本参数和实现原理介绍分析在这里推荐给大家martinfowler的熔断器介绍和权威指南,有兴趣... 查看详情

精华推荐|mysql技术专题「主从同步架构」全面详细透析mysql的三种主从复制(replication)机制的原理和实战开发(原理+实战)(代码片段)

前提概要随着应用业务数据不断的增大,应用的响应速度不断下降,在检测过程中我们不难发现大多数的请求都是查询操作。此时,我们可以将数据库扩展成主从复制模式,将读操作和写操作分离开来,多台... 查看详情

深入浅出dubbo3原理及实战「技术大纲」深入浅出并发实战课程系列及技术指南

Dubbo3开题简介如开篇所述,Dubbo提供了构建云原生微服务业务的一站式解决方案,可以使用Dubbo快速定义并发布微服务组件,同时基于Dubbo开箱即用的丰富特性及超强的扩展能力,构建运维整个微服务体系所需的各... 查看详情

深入理解rocketmq---实战(控制台搭建)(代码片段)

 rocketMQ控制台搭建(1)下载rocketmq-console代码:https://github.com/875279177/incubator-rocketmq-externals(2)修改配置application配置文件,主要修改端口号及rocketmq.config.dataPathserver.contextPath=server.port=8080#spring.application.index=truespring.application.n... 查看详情