rocketmq专题2:三种常用生产消费方式(顺序广播定时)以及顺序消费源码探究(代码片段)

kidezyq kidezyq     2023-01-01     432

关键词:

顺序、广播、定时任务

前插

? 在进行常用的三种消息类型例子展示的时候,我们先来说一说RocketMQ的几个重要概念:

  • PullConsumer与PushConsumer:主要区别在于Pull与Push的区别。对于PullConsumer,消费者会主动从broker中拉取消息进行消费。而对于PushConsumer,会封装包含消息获取、消息处理以及其他相关操作的接口给程序调用
  • Tag: Tag可以看做是一个子主题(sub-topic),可以进一步细化主题下的相关子业务。提高程序的灵活性和可扩展性
  • Broker:RocketMQ的核心组件之一。用来从生产者处接收消息,存储消息以及将消息推送给消费者。同时RocketMQ的broker也用来存储消息相关的数据,比如消费者组、消费处理的偏移量、主题以及消息队列等
  • Name Server: 可以看做是一个信息路由器。生产者和消费者从NameServer中查找对应的主题以及相应的broker

实例

? 这里我们不玩虚的,直接将三个类型的生产者,消费者代码实例给出(在官网给出的例子上做了些许改动和注释说明):

生产者代码

/**
 * 多种类型组合消息测试
 * @author ziyuqi
 *
 */
public class MultiTypeProducer 
    public static void main(String[] args) throws Exception 
        // 顺序消息生产者  FIFO
        OrderedProducer orderedProducer = new OrderedProducer();
        orderedProducer.produce();
        
        // 广播消息生产者
        /*BroadcastProducer broadcastProducer = new BroadcastProducer();
        broadcastProducer.produce();*/
        
        // 定时任务消息生产者
        /*ScheduledProducer scheduledProducer = new ScheduledProducer();
        scheduledProducer.produce();*/
    


/**
 * 按顺序发送消息的生产者 
 * @author ziyuqi
 *
 */
class OrderedProducer 
    public void produce() throws Exception 
        DefaultMQProducer producer = new DefaultMQProducer("GroupD");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();
        String[] tags = new String[] "tagA", "tagB", "tagC", "tagD", "tagE";
        for (int i=0; i<50; i++) 
            Message message = new Message("OrderedTopic", tags[i % tags.length], "KEY" + i, ("Ordered Msg:" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult sendResult = producer.send(message, new MessageQueueSelector() 
                /**
                 * 所谓的顺序,只能保证同一MessageQueue放入的消息满足FIFO。该方法返回应该将消息放入那个MessageQueue,最后一个参数为send传入的最后一个参数
                 * 如果需要全局保持FIFO,则所有消息应该依次放入同一队列中去mqs队列中的同一下标
                 */
                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) 
                    // 消息被分开放入多个队列,每个队列中的消息保证按顺序被消费FIFO
                    /*int index = (Integer) arg % mqs.size();
                    System.out.println("QueueSize:" + mqs.size());
                    return mqs.get(index);*/
                    
                    // 消息全部放入同一队列,全局保持顺序性 
                    return mqs.get(0);
                
            , i);
            System.out.println(sendResult);
        
        producer.shutdown();
    


/**
 * 广播生产者
 * @author ziyuqi
 *
 */
class BroadcastProducer 
    public void produce() throws Exception 
        DefaultMQProducer producer = new DefaultMQProducer("GroupA");
        // 也必须设置nameServer
        producer.setNamesrvAddr("localhost:9876");
        producer.start();
        for (int i=0; i<50; i++) 
            Message message = new Message("BroadcastTopic", "tagA", "OrderID188", ("Ordered Msg:" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult sendResult = producer.send(message);
            System.out.println(sendResult);
        
        producer.shutdown();
    


/**
 * 定时消息发送者
 * @author ziyuqi
 *
 */
class ScheduledProducer 
    public void produce() throws Exception 
        DefaultMQProducer producer = new DefaultMQProducer("GroupA");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();
        for (int i=0; i<50; i++) 
            Message message = new Message("scheduledTopic", ("Message:" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            // 设置投递的延迟时间
            message.setDelayTimeLevel(3);
            SendResult sendResult = producer.send(message);
            System.out.println(sendResult);
        
        producer.shutdown();
    

消费者代码

public class MultiTypeConsumer 
    public static void main(String[] args) throws Exception 
        // 按顺序消费者
        OrderedConsumer orderedConsumer = new OrderedConsumer();
        orderedConsumer.consume();
        
        // 广播消费者
        /*BroadcastConsumer broadcastConsumer = new BroadcastConsumer();
        broadcastConsumer.consume();*/
        
        // 定时任务消费者
        /*ScheduledConsumer scheduledConsumer = new ScheduledConsumer();
        scheduledConsumer.consume();*/
    



/**
 * 按顺序的消费者
 * @author ziyuqi
 *
 */
class OrderedConsumer 
    public void consume() throws Exception 
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("GroupD");
        /*
         *  设置从哪里开始消费 :
         *  当设置为: ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET 
         */
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);       
        consumer.setNamesrvAddr("localhost:9876");
        // 设置定于的主题和tag(必须显示指定tag)
        consumer.subscribe("OrderedTopic", "tagA || tagB || tagC || tagD || tagE");                     
        
        consumer.setMessageListener(new MessageListenerOrderly() 
            AtomicLong num = new AtomicLong(0);
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) 
                /**
                 *  设置是否自动提交: 默认自动提交,提交之后消息就不能够被再次消费。
                 *  非自动提交时,消息可能会被重复消费
                 */
                context.setAutoCommit(false);
                this.num.incrementAndGet();
                try 
                    for (MessageExt msg : msgs) 
                        System.out.println("Received:num=" + this.num.get() +", queueId=" + msg.getQueueId() +  ", Keys=" + msg.getKeys() + ", value=" + new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET));
                    
                 catch (UnsupportedEncodingException e) 
                    e.printStackTrace();
                
                /*try 
                    TimeUnit.SECONDS.sleep(1);
                 catch (InterruptedException e) 
                    e.printStackTrace();
                */
                if (this.num.get() % 3 == 0) 
                    // return ConsumeOrderlyStatus.ROLLBACK;
                 else if (this.num.get() % 4 == 0) 
                    return ConsumeOrderlyStatus.COMMIT;
                 else if (this.num.get() % 5 == 0) 
                    context.setSuspendCurrentQueueTimeMillis(3000);
                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                
            
                // 非主动提交的时候,SUCCESS不会导致队列消息提交,消息未提交就可以被循环消费
                return ConsumeOrderlyStatus.SUCCESS;
            
        );
        consumer.start();
    



class BroadcastConsumer 
    public void consume() throws Exception 
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("GroupA");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        // 即使是广播形式下,nameServer还是要设置
        consumer.setNamesrvAddr("localhost:9876");
        // 设置消费的消息类型为广播类消息
        consumer.setMessageModel(MessageModel.BROADCASTING);
        consumer.subscribe("BroadcastTopic", "tagA || tagB || tagC");
        consumer.registerMessageListener(new MessageListenerConcurrently() 
            
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) 
                try 
                    for (MessageExt msg : msgs) 
                        System.out.println("Received:" + new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET));
                    
                 catch (UnsupportedEncodingException e) 
                    e.printStackTrace();
                
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            
        );
        consumer.start();
    


/**
 * 定时任务消费者
 * @author ziyuqi
 *
 */
class ScheduledConsumer 
    public void consume() throws Exception 
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("GroupA");
        consumer.setNamesrvAddr("localhost:9876");  
        consumer.subscribe("scheduledTopic", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() 
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) 
                for (MessageExt msg : msgs) 
                    try 
                        System.out.println("Received:[" + new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET) + "]" + (System.currentTimeMillis() - msg.getStoreTimestamp()) + " ms later!");
                     catch (UnsupportedEncodingException e) 
                        e.printStackTrace();
                    
                
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            
        );
        consumer.start();
    

源码与实例分析

? 结合我上面的测试代码,以及我在测试中主要针对顺序消费的疑惑和源码调试。我这里简单分析下顺序消费者的相关执行过程,大致的执行步骤如下:

消费者启动

? 我们知道每次consumer创建之后,都会调用consumer.start()方法来启动消费者。跟进代码嵌套,不难发现最终会进入DefaultMQPushConsumerImplstart方法中,该方法的主要代码如下:

 public synchronized void start() throws MQClientException 
     switch (this.serviceState) 
             // 消费者启动状态满足Create_just
         case CREATE_JUST:
             log.info("the consumer [] start beginning. messageModel=, isUnitMode=", this.defaultMQPushConsumer.getConsumerGroup(),
                      this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
             this.serviceState = ServiceState.START_FAILED;
             // 配置检查
             this.checkConfig();

             this.copySubscription();

             if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) 
                 this.defaultMQPushConsumer.changeInstanceNameToPID();
             

             this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);

             this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
             this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
             this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
             this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);

             this.pullAPIWrapper = new PullAPIWrapper(
                 mQClientFactory,
                 this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
             this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);

             if (this.defaultMQPushConsumer.getOffsetStore() != null) 
                 this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
              else 
                 switch (this.defaultMQPushConsumer.getMessageModel()) 
                     case BROADCASTING:
                         this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                         break;
                     case CLUSTERING:
                         this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                         break;
                     default:
                         break;
                 
                 this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
             
             this.offsetStore.load();

             if (this.getMessageListenerInner() instanceof MessageListenerOrderly) 
                 this.consumeOrderly = true;
                 this.consumeMessageService =
                     new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
              else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) 
                 this.consumeOrderly = false;
                 this.consumeMessageService =
                     new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
             

             this.consumeMessageService.start();

             boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
             if (!registerOK) 
                 this.serviceState = ServiceState.CREATE_JUST;
                 this.consumeMessageService.shutdown();
                 throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
                                             + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                                             null);
             
             // 主要方法在这,启动MQ客户端工厂,进行消息拉取
             mQClientFactory.start();
             log.info("the consumer [] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
             this.serviceState = ServiceState.RUNNING;
             break;
         case RUNNING:
         case START_FAILED:
         case SHUTDOWN_ALREADY:
             throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
                                         + this.serviceState
                                         + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                                         null);
         default:
             break;
     

     this.updateTopicSubscribeInfoWhenSubscriptionChanged();
     this.mQClientFactory.checkClientInBroker();
     this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
     this.mQClientFactory.rebalanceImmediately();
 

?

MQClient启动

? 上一段源码我们发现最终调用了mQClientFactory.start();.我们继续跟进该方法,发现实际调用的是MQClientInstance.start()

 public void start() throws MQClientException 

     synchronized (this) 
         switch (this.serviceState) 
             case CREATE_JUST:
                 this.serviceState = ServiceState.START_FAILED;
                 // If not specified,looking address from name server
                 if (null == this.clientConfig.getNamesrvAddr()) 
                     this.mQClientAPIImpl.fetchNameServerAddr();
                 
                 // Start request-response channel
                 this.mQClientAPIImpl.start();
                 // Start various schedule tasks
                 this.startScheduledTask();
                 // Start pull service 关键点在这调用了pullMessageService的start方法
                 this.pullMessageService.start();
                 // Start rebalance service
                 this.rebalanceService.start();
                 // Start push service
                 this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                 log.info("the client factory [] start OK", this.clientId);
                 this.serviceState = ServiceState.RUNNING;
                 break;
             case RUNNING:
                 break;
             case SHUTDOWN_ALREADY:
                 break;
             case START_FAILED:
                 throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
             default:
                 break;
         
     
 

消息拉取

? 根据上一段代码的注释,我们进入到核心的消息推送代码PullMessageServicestart方法(实际上PullMessage继承自Thread类,调用的是run方法):

@Override
public void run() 
    log.info(this.getServiceName() + " service started");

    while (!this.isStopped()) 
        try 
            PullRequest pullRequest = this.pullRequestQueue.take();
            this.pullMessage(pullRequest);  // 重点转移到该方法具体推送实现
         catch (InterruptedException ignored) 
         catch (Exception e) 
            log.error("Pull Message Service Run Method exception", e);
        
    

    log.info(this.getServiceName() + " service end");


private void pullMessage(final PullRequest pullRequest) 
        final MQConsumerInner consumer =         this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
    if (consumer != null) 
        DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
        impl.pullMessage(pullRequest);      // 调用默认的拉消息消费者实现
     else 
        log.warn("No matched consumer for the PullRequest , drop it", pullRequest);
    

? 我们继续跟进DefaultMQPushConsumerImplpullMessage方法:

 public void pullMessage(final PullRequest pullRequest) 
     // ... 省略

     final long beginTimestamp = System.currentTimeMillis();
     // 该回调函数实际是对消息消费的具体处理
     PullCallback pullCallback = new PullCallback() 
         @Override
         public void onSuccess(PullResult pullResult) 
             if (pullResult != null) 
                 pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
                                                                                              subscriptionData);

                 switch (pullResult.getPullStatus()) 
                     case FOUND:
                         long prevRequestOffset = pullRequest.getNextOffset();
                         pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                         long pullRT = System.currentTimeMillis() - beginTimestamp;
                         DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
                                                                                            pullRequest.getMessageQueue().getTopic(), pullRT);

                         long firstMsgOffset = Long.MAX_VALUE;
                         if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) 
                             DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                          else 
                             firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();

                             DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
                                                                                                 pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());

                             boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
                             // 向线程池丢入消费请求任务
                             DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
                                 pullResult.getMsgFoundList(),
                                 processQueue,
                                 pullRequest.getMessageQueue(),
                                 dispatchToConsume);

                             if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) 
                                 DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
                                                                                        DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
                              else 
                                 DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                             
                         

                         if (pullResult.getNextBeginOffset() < prevRequestOffset
                             || firstMsgOffset < prevRequestOffset) 
                             log.warn(
                                 "[BUG] pull message result maybe data wrong, nextBeginOffset:  firstMsgOffset:  prevRequestOffset: ",
                                 pullResult.getNextBeginOffset(),
                                 firstMsgOffset,
                                 prevRequestOffset);
                         

                         break;
                     case NO_NEW_MSG:
                         pullRequest.setNextOffset(pullResult.getNextBeginOffset());

                         DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);

                         DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                         break;
                     case NO_MATCHED_MSG:
                         pullRequest.setNextOffset(pullResult.getNextBeginOffset());

                         DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);

                         DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                         break;
                     case OFFSET_ILLEGAL:
                         log.warn("the pull request offset illegal,  ",
                                  pullRequest.toString(), pullResult.toString());
                         pullRequest.setNextOffset(pullResult.getNextBeginOffset());

                         pullRequest.getProcessQueue().setDropped(true);
                         DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() 

                             @Override
                             public void run() 
                                 try 
                                     DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
                                                                                             pullRequest.getNextOffset(), false);

                                     DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());

                                     DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());

                                     log.warn("fix the pull request offset, ", pullRequest);
                                  catch (Throwable e) 
                                     log.error("executeTaskLater Exception", e);
                                 
                             
                         , 10000);
                         break;
                     default:
                         break;
                 
             
         

         @Override
         public void onException(Throwable e) 
             if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) 
                 log.warn("execute the pull request exception", e);
             

             DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
         
     ;

     // ... 省略
     try 
         this.pullAPIWrapper.pullKernelImpl(            // 定义消息拉取核心实现的相关参数:包括拉取方式、回调函数等,最终会通过Netty远程请求消息然后请求成功后调用回调方法
             pullRequest.getMessageQueue(),
             subExpression,
             subscriptionData.getExpressionType(),
             subscriptionData.getSubVersion(),
             pullRequest.getNextOffset(),
             this.defaultMQPushConsumer.getPullBatchSize(),
             sysFlag,
             commitOffsetValue,
             BROKER_SUSPEND_MAX_TIME_MILLIS,
             CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
             CommunicationMode.ASYNC,
             pullCallback
         );
      catch (Exception e) 
         log.error("pullKernelImpl exception", e);
         this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
     
 

? 以上代码注释有三个重点的地方,具体的处理流程大致是这样。首先this.pullAPIWrapper.pullKernelImpl这个方法定义了具体的消息拉取策略,内部实现其实会根据消息类型取拉取消息。对于默认的集群消息模式,实际会调用Netty进行消息拉取,拉取结束后会调用注释中的回调函数进行处理。最终实际会进入DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest,而实际上对于顺序消息消费会进入ConsumeMessageOrderlyServicesubmitConsumeRequest方法。该方法直接向消费线程池中放入一个消费请求任务。

消费请求任务

? 我们继续跟进ConsumeRequest消费请求任务的具体实现:

@Override
public void run() 
    if (this.processQueue.isDropped()) 
        log.warn("run, the message queue not be able to consume, because it's dropped. ", this.messageQueue);
        return;
    

    final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
    synchronized (objLock) 
        if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
            || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) 
            final long beginTime = System.currentTimeMillis();
            for (boolean continueConsume = true; continueConsume; ) 
                if (this.processQueue.isDropped()) 
                    log.warn("the message queue not be able to consume, because it's dropped. ", this.messageQueue);
                    break;
                

                if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
                    && !this.processQueue.isLocked()) 
                    log.warn("the message queue not locked, so consume later, ", this.messageQueue);
                    ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
                    break;
                

                if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
                    && this.processQueue.isLockExpired()) 
                    log.warn("the message queue lock expired, so consume later, ", this.messageQueue);
                    ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
                    break;
                

                long interval = System.currentTimeMillis() - beginTime;
                if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) 
                    ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10);
                    break;
                

                final int consumeBatchSize =
                    ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();

                List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize);
                if (!msgs.isEmpty()) 
                    final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);

                    ConsumeOrderlyStatus status = null;

                    ConsumeMessageContext consumeMessageContext = null;
                    if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) 
                        consumeMessageContext = new ConsumeMessageContext();
                        consumeMessageContext
                            .setConsumerGroup(ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumerGroup());
                        consumeMessageContext.setMq(messageQueue);
                        consumeMessageContext.setMsgList(msgs);
                        consumeMessageContext.setSuccess(false);
                        // init the consume context type
                        consumeMessageContext.setProps(new HashMap<String, String>());
                        ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
                    

                    long beginTimestamp = System.currentTimeMillis();
                    ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
                    boolean hasException = false;
                    try 
                        this.processQueue.getLockConsume().lock();
                        if (this.processQueue.isDropped()) 
                            log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. ",
                                     this.messageQueue);
                            break;
                        
                        // 调用注册的listener消费消息,并且得到返回结果
                        status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
                     catch (Throwable e) 
                        log.warn("consumeMessage exception:  Group:  Msgs:  MQ: ",
                                 RemotingHelper.exceptionSimpleDesc(e),
                                 ConsumeMessageOrderlyService.this.consumerGroup,
                                 msgs,
                                 messageQueue);
                        hasException = true;
                     finally 
                        this.processQueue.getLockConsume().unlock();
                    

                    if (null == status
                        || ConsumeOrderlyStatus.ROLLBACK == status
                        || ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) 
                        log.warn("consumeMessage Orderly return not OK, Group:  Msgs:  MQ: ",
                                 ConsumeMessageOrderlyService.this.consumerGroup,
                                 msgs,
                                 messageQueue);
                    

                    long consumeRT = System.currentTimeMillis() - beginTimestamp;
                    if (null == status) 
                        if (hasException) 
                            returnType = ConsumeReturnType.EXCEPTION;
                         else 
                            returnType = ConsumeReturnType.RETURNNULL;
                        
                     else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) 
                        returnType = ConsumeReturnType.TIME_OUT;
                     else if (ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) 
                        returnType = ConsumeReturnType.FAILED;
                     else if (ConsumeOrderlyStatus.SUCCESS == status) 
                        returnType = ConsumeReturnType.SUCCESS;
                    

                    if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) 
                        consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());
                    

                    if (null == status) 
                        status = ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                    

                    if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) 
                        consumeMessageContext.setStatus(status.toString());
                        consumeMessageContext
                            .setSuccess(ConsumeOrderlyStatus.SUCCESS == status || ConsumeOrderlyStatus.COMMIT == status);
                        ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
                    

                    ConsumeMessageOrderlyService.this.getConsumerStatsManager()
                        .incConsumeRT(ConsumeMessageOrderlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);
                    // 处理Listener的返回结果
                    continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
                 else 
                    continueConsume = false;
                
            
         else 
            if (this.processQueue.isDropped()) 
                log.warn("the message queue not be able to consume, because it's dropped. ", this.messageQueue);
                return;
            

            ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);
        
    

? 可以看出我们开始会调用我们实现的MessageListener对拉取到的消息进行消费,消费完成之后我们会拿到消费结果,并对消费结果进行处理。

消费结果处理(COMMIT ROLLBACK)

? 我们直接跟进消费结果处理代码:

public boolean processConsumeResult(
        final List<MessageExt> msgs,
        final ConsumeOrderlyStatus status,
        final ConsumeOrderlyContext context,
        final ConsumeRequest consumeRequest
    ) 
        boolean continueConsume = true;
        long commitOffset = -1L;
        if (context.isAutoCommit())    // 自动提交的情况下
            switch (status) 
                case COMMIT:
                case ROLLBACK:
                    log.warn("the message queue consume result is illegal, we think you want to ack these message ",
                        consumeRequest.getMessageQueue());
                case SUCCESS:
                    commitOffset = consumeRequest.getProcessQueue().commit();
                    this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
                    break;
                case SUSPEND_CURRENT_QUEUE_A_MOMENT:
                    this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
                    if (checkReconsumeTimes(msgs)) 
                        consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs);
                        this.submitConsumeRequestLater(
                            consumeRequest.getProcessQueue(),
                            consumeRequest.getMessageQueue(),
                            context.getSuspendCurrentQueueTimeMillis());
                        continueConsume = false;
                     else 
                        commitOffset = consumeRequest.getProcessQueue().commit();
                    
                    break;
                default:
                    break;
            
         else 
            switch (status)    // 非自动提交,需区别对待返回的处理结果
                case SUCCESS:
                    this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
                    break;
                case COMMIT:
                    commitOffset = consumeRequest.getProcessQueue().commit();
                    break;
                case ROLLBACK:
                    consumeRequest.getProcessQueue().rollback();
                    this.submitConsumeRequestLater(
                        consumeRequest.getProcessQueue(),
                        consumeRequest.getMessageQueue(),
                        context.getSuspendCurrentQueueTimeMillis());
                    continueConsume = false;
                    break;
                case SUSPEND_CURRENT_QUEUE_A_MOMENT:
                    this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
                    if (checkReconsumeTimes(msgs)) 
                        consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs);
                        this.submitConsumeRequestLater(
                            consumeRequest.getProcessQueue(),
                            consumeRequest.getMessageQueue(),
                            context.getSuspendCurrentQueueTimeMillis());
                        continueConsume = false;
                    
                    break;
                default:
                    break;
            
        

        if (commitOffset >= 0 && !consumeRequest.getProcessQueue().isDropped()) 
            this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), commitOffset, false);
        

        return continueConsume;
    

? 因为我们例子中写的是非自动提交,我们就来看看非自动提交下ROLLBACK和COMMIT的具体实现(对应ProcessQueue的相关方法):

public void rollback() 
    try 
        this.lockTreeMap.writeLock().lockInterruptibly();
        try 
            /**
                 *  当消费到KEY2的时候,因为num=3所以进入rollback方法
                 *  此时:
                 *      this.msgTreeMap包含所有未消费的消息 此时有 KEY3 --- KEY49 
                 *      this.consumingMsgOrderlyTreeMap 有所有按顺序消费过的消息 KEY0 --- KEY2
                 *  不难看出一旦执行rollback,不仅仅是将当前消费的消息重新放入消息队列供再次消费,前面已经处理的消息
                 *  将都会重新放入消息队列供再次消费。也就能解释前面所出现的为什么自动提交设置为false之后,消息重复消费
                 */
            this.msgTreeMap.putAll(this.consumingMsgOrderlyTreeMap);
            this.consumingMsgOrderlyTreeMap.clear();
         finally 
            this.lockTreeMap.writeLock().unlock();
        
     catch (InterruptedException e) 
        log.error("rollback exception", e);
    


public long commit() 
    try 
        this.lockTreeMap.writeLock().lockInterruptibly();
        try 
            // 获取已顺序消费消息队列中最后一个消息的偏移值
            Long offset = this.consumingMsgOrderlyTreeMap.lastKey();
            // 原队列消息个数减去已顺序消费但未提交的消息个数为剩下可继续消费的消息个数
            msgCount.addAndGet(0 - this.consumingMsgOrderlyTreeMap.size());
            // 队列消息总长度减去待提交的队列消息总长度
            for (MessageExt msg : this.consumingMsgOrderlyTreeMap.values()) 
                msgSize.addAndGet(0 - msg.getBody().length);
            
            // 将已消费未提交的队列列表清空
            this.consumingMsgOrderlyTreeMap.clear();
            if (offset != null) 
                return offset + 1;
            
         finally 
            this.lockTreeMap.writeLock().unlock();
        
     catch (InterruptedException e) 
        log.error("commit exception", e);
    

    return -1;

? 至此,整个简单的消费流程分析完成。

消费流程源码分析总结

  • Pull OR Push:即使是Push模式的Consumer,其最终实现还是是通过Pull的方式来进行的
  • Netty:集群模式的远程消息获取是通过Netty来实现的

总结

? RocketMQ的常用三种消息生产消费模式到现在我们就基本分析完了。个人认为顺序消息消费给需要顺序执行的流程异步实现提供了强有力的支持。这一点特别适用于阿里当前的相关领域。当然RocketMQ也不是尽善尽美的,我个人在测试的时候发现顺序消息消费的性能不算特别高,当然具体什么原因只有留到后续分析了。还有,因为这个项目开始是阿里内部研发的,可能源码注释上相比于其他开源项目还是要少一些,也没有那么清楚。以至于consumer.setConsumeFromWhere这个的不同设值的具体区别在哪我还没有探究出来(想想Spring的事务隔离级别以及传递特性相关常量的注释基本一看就懂了),限于篇幅还有我赶紧赶去上班,就不再继续深究了(后面继续)。

参考链接

http://rocketmq.apache.org/docs/

rocketmq之消息的生产与消费(代码片段)

基本示例中提供了以下两个功能RocketMQ可用于以三种方式发送消息:可靠的同步、可靠的异步和单向传输。前两种消息类型是可靠的,因为无论它们是否成功发送都有响应。RocketMQ可以用来消费消息。1添加依赖maven:<depen... 查看详情

rocketmq之消息的生产与消费(代码片段)

基本示例中提供了以下两个功能RocketMQ可用于以三种方式发送消息:可靠的同步、可靠的异步和单向传输。前两种消息类型是可靠的,因为无论它们是否成功发送都有响应。RocketMQ可以用来消费消息。1添加依赖maven:<depen... 查看详情

rocketmq的顺序消息(顺序消费)(代码片段)

简单介绍了消息有序性的概念,以及RocketMQ如何实现消息的顺序消费。文章目录1消息的有序性2生产者有序发送3消费者有序消费1消息的有序性消息有序指的是一类消息消费时,能按照发送的顺序来消费。例如:一个订... 查看详情

rocketmq基本概念

#基本概念----##1消息模型(MessageModel)RocketMQ主要由Producer、Broker、Consumer三部分组成,其中Producer负责生产消息,Consumer负责消费消息,Broker负责存储消息。Broker在实际部署过程中对应一台服务器,每个Broker可以存储多个Topic的消... 查看详情

rocketmq之消息的生产与消费(代码片段)

基本示例中提供了以下两个功能RocketMQ可用于以三种方式发送消息:可靠的同步、可靠的异步和单向传输。前两种消息类型是可靠的,因为无论它们是否成功发送都有响应。RocketMQ可以用来消费消息。1添加依赖maven:<depen... 查看详情

rocketmq简单的消费者和生产者(示例代码)(代码片段)

一、生产者  使用RocketMQ以三种方式发送消息:可靠的同步,可靠的异步和单向传输。  (1)同步发送消息(可靠的同步传输,适用于重要的短信通知等)publicclassSyncProducerpublicstaticvoidmain(String[]args)throwsException//Instantiatewitha... 查看详情

rocketmq的消息重试

...术A对于顺序消息,当消费者消费消息失败后,消息队列RocketMQ会自动不断进行消息重试(每次间隔时间为1秒),这时,应用会出现消息消费被阻塞的情况。因此,在使用顺序消息时,务必保证应用能够及时监控并处理消费失败... 查看详情

rocketmq顺序消费

参考技术A对于所有的MQ来说,必问的一道面试题就是RocketMQ顺序消息怎样做?原理是什么?首先我们要明确什么顺序消费,顺序消费的定义是什么?我所理解的顺序消费,指的针对某一类消息,比如都是订单A的消息来说,它的消... 查看详情

关于rocketmq的基础api操作——这一篇就够了(代码片段)

关于RocketMQ的基础操作一、基础API操作1、普通消息1.1、消息生产端1.2、消息消费端2、顺序消息2.1、消息生产端2.2、消息消费端3、广播消息3.1、消息生产端3.2、消息消费端4、延迟消息4.1、消息生产端4.2、消息消费端4.3、实现原理... 查看详情

rocketmq(十四)顺序消息(代码片段)

1、什么是顺序消息顺序消息指的是,严格按照消息的发送顺序进行消费的消息(FIFO)。默认情况下生产者会把消息以RoundRobin轮询方式发送到不同的Queue分区队列;而消费消息时会从多个Queue上拉取消息,这种情况下的发... 查看详情

rocketmq-如何实现顺序消息

...新增消息、修改消息。如何发送和消费顺序消息我们使用RocketMQ顺序消息来模拟一下订单的场景,顺序消息分为两部分:顺序发送、顺序消费。1.顺序发消息上面代码模拟了按顺序依次发送创建、支付、退款消息到TopicTest中。在ap... 查看详情

rocketmq详解(代码片段)

RocketMQ详解1.基础概念2.RocketMQ消费模式2.1广播模式2.2集群模式3.基础架构3.1Broker的存储结构3.2存储文件简介3.3Consumer端的负载均衡机制3.4消息刷盘机制3.5Mmap+pageCache3.5.1传统缓存IO和Mmap3.5.2pageCache3.5.3预映射机制+文件预热机制3... 查看详情

rocketmq笔记:普通消息(代码片段)

  普通消息为RocketMQ中最基础的消息,支持生产者和消费者的异步解耦通信。一、普通消息的生命周期 1、初始化  消息被生产者构建并完成初始化,待发送到服务端的状态。2、待消费  消息被发送到服务端,对消费... 查看详情

rocketmq使用之消息保证,重复读,积压,顺序,过滤,延时,事务,死信

...投递和不重复投递是很难的,就是所谓的有且仅有一次。RocketMQ择了确保一定投递,保证消息不丢失,但有可能造成消息重复。处理消息重复问题,主要有业务端自己保证,主要的方式有两种:业务幂等和消息去重发生了消息积... 查看详情

rocketmq概述

Rocket火箭MQ的作用:同步转异步(异步解耦)。难点:如何确保消息一定被消费,而且仅消费一次。 1、消息架构:生产者、服务器、消费者、路由发现。2、消息顺序:严格按照消息到达服务器的顺序进行消费。3、消息过滤... 查看详情

rocketmq学习笔记:消息发送模式(代码片段)

这是本人学习的总结,主要学习资料如下马士兵教育目录1、消息发送模式2、消息消费模式3、顺序消息的消费和发送3.1、全局顺序3.2、部分顺序3.3、部分顺序代码样例3.3.1、依赖3.3.2、发送信息3.3.3、接受信息4、延时消息的消... 查看详情

rocketmq-消息消费模式顺序消费(代码片段)

RocketMQ-消息消费模式顺序消费RocketMQ-消息消费模式集群模式集群模式的演示(本身就默认)Rocketmq存储队列广播模式顺序消费如何改实现顺序消费RocketMQ-消息消费模式集群模式在消费模式为集群的情况下,如果机器是集群的,消息只会... 查看详情

rocketmq快速入门实战(代码片段)

文章目录1.RocketMQ基础1.1组成结构1.1.1名词解释如下:1.1.2交互过程如下:1.2安装RocketMQ2.快速入门2.1三种消息发送方式2.2消息结构2.2.1基础属性2.2.2扩展属性2.3生产者工程发送同步消息测试2.4消费者工程消费消息测试1.RocketMQ... 查看详情