rocketmq源码(16)—消费者负载均衡服务rebalanceservice入口源码(代码片段)

刘Java 刘Java     2023-02-23     256

关键词:

基于RocketMQ release-4.9.3,深入的介绍了消费者负载均衡服务RebalanceService入口源码。

上一篇文章我们学习了RocketMQ源码(15)—消费者DefaultMQPushConsumer启动主要流程源码

RocketMQ一个消费者组中可以有多个消费者,在集群模式下他们共同消费topic下的所有消息,RocketMQ规定一个消息队列仅能被一个消费者消费,但一个消费者可以同时消费多个消息队列。这就涉及到如何将多个消息队列分配给等多个消费者的问题。

RocketMQ中使用负载均衡服务RebalanceService来专门处理多个消息队列和消费者的对应关系,并且提供了多个不同的消费者负载均衡策略,即如何分配消息队列给这些消费者。

另外,当消费者正常退出,异常关闭通道,或者新加入的时候,同样需要负载均衡服务RebalanceService来进行消息队列分配的重平衡。

更重要的是,一个消费者启动之后,其消费消息的触发并不是pullMessageService消息拉取服务,而真正的源头正是负载均衡服务RebalanceService。因此我们有必要先学习RebalanceService的原理和源码。

文章目录

1 负载均衡or重平衡的触发

有三种情况会触发Consumer进行负载均衡或者说重平衡:

  1. RebalanceService服务是一个线程任务,由MQClientInstance启动,其每隔20s自动进行一次自动负载均衡
  2. Broker触发的重平衡
    1. Broker收到心跳请求之后如果发现消息中有新的consumer连接或者consumer订阅了新的topic或者移除了topic的订阅, 则Broker发送Code为NOTIFY_CONSUMER_IDS_CHANGED的请求给该group下面的所有Consumer,要求进行一次负载均衡。
    2. 如果某个客户端连接出现连接异常事件EXCEPTION、连接断开事件CLOSE、或者连接闲置事件IDLE,则Broker同样会发送重平衡请求给消费者组下面的所有消费者。
  3. 新的Consumer服务启动的时候,主动调用rebalanceImmediately唤醒负载均衡服务rebalanceService,进行重平衡。

1.1 RebalanceService自动重平衡

RebalanceService#run方法,也就是负载均衡服务运行的任务,最多每隔20s执行一次重平衡。主要逻辑是在mqClientFactory#doRebalance方法中实现的。

/**
 * RebalanceServicede 方法
 */
@Override
public void run() 
    log.info(this.getServiceName() + " service started");
    /*
     * 运行时逻辑
     * 如果服务没有停止,则在死循环中执行负载均衡
     */
    while (!this.isStopped()) 
        //等待运行,默认最多等待20s,可以被唤醒
        this.waitForRunning(waitInterval);
        //执行重平衡操作
        this.mqClientFactory.doRebalance();
    

    log.info(this.getServiceName() + " service end");

1.2 Consumer启动重平衡

新的Consumer服务启动的时候,主动调用rebalanceImmediately唤醒负载均衡服务rebalanceService,进行重平衡。

/**
 * MQClientInstance的方法
 * 立即重平衡
 */
public void rebalanceImmediately() 
    //唤醒重平衡服务,立即重平衡
    this.rebalanceService.wakeup();

1.3 Broker请求重平衡

broker触发的重平衡有两种情况:

  1. Broker收到心跳请求之后如果发现消息中有新的consumer连接,或者consumer订阅了新的topic,或者移除了topic的订阅, 则Broker发送Code为NOTIFY_CONSUMER_IDS_CHANGED的请求给该group下面的所有Consumer,要求进行一次负载均衡。
  2. 如果某个客户端连接出现连接异常事件EXCEPTION、连接断开事件CLOSE、或者连接闲置事件IDLE,则Broker同样会发送重平衡请求给消费者组下面的所有消费者。处理入口方法为ClientHousekeepingService# doChannelCloseEvent方法。

新的Consumer和Producer启动的时候,就会发送心跳信息给Broker,MQClientInstance内部的服务也会定时30s发送心跳信息给Broker。关于发送心跳请求sendHeartbeatToAllBrokerWithLock方法的源码,我们在Producer启动的部分就讲过了,我们现在来看看Broker处理心跳请求的源码。

心跳请求的Code为HEART_BEAT,该请求最终被Broker的ClientManageProcessor处理器处理。

/**
 * ClientManageProcessor的方法
 */
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
        throws RemotingCommandException 
    switch (request.getCode()) 
        //客户端心跳请求
        case RequestCode.HEART_BEAT:
            //客户端心跳请求
            return this.heartBeat(ctx, request);
        case RequestCode.UNREGISTER_CLIENT:
            return this.unregisterClient(ctx, request);
        case RequestCode.CHECK_CLIENT_CONFIG:
            return this.checkClientConfig(ctx, request);
        default:
            break;
    
    return null;

1.3.1 Broker处理心跳请求

Broker的ClientManageProcessor#heartBeat该方法用于Broker处理来自客户端(包括consumer和producer)的心跳请求。主要流程就是:

  1. 解码消息中的信息成为HeartbeatData对象,该对象的结构我们在在Producer启动的部分就讲过了。
  2. 循环遍历处理consumerDataSet集合,对ConsumerData信息进行注册或者更改,如果consumer信息发生了改变,Broker会发送NOTIFY_CONSUMER_IDS_CHANGED请求给同组的所有consumer客户端,要求进行重平衡操作。
  3. 循环遍历处理consumerDataSet集合,对 ProducerData信息进行注册或者更改。
/**
 * ClientManageProcessor的方法
 * <p>
 * 处理客户端心跳请求
 */
public RemotingCommand heartBeat(ChannelHandlerContext ctx, RemotingCommand request) 
    //构建响应命令对象
    RemotingCommand response = RemotingCommand.createResponseCommand(null);
    //解码
    HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class);
    //构建客户端连接信息对象
    ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
            ctx.channel(),
            heartbeatData.getClientID(),
            request.getLanguage(),
            request.getVersion()
    );
    /*
     * 1 循环遍历处理consumerDataSet,即处理consumer的心跳信息
     */
    for (ConsumerData data : heartbeatData.getConsumerDataSet()) 
        //查找broker缓存的当前消费者组的订阅组配置
        SubscriptionGroupConfig subscriptionGroupConfig =
                this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(
                        data.getGroupName());
        boolean isNotifyConsumerIdsChangedEnable = true;
        //如果已存在订阅组
        if (null != subscriptionGroupConfig) 
            //当consumer发生改变的时候是否支持通知同组的所有consumer,默认true,即支持
            isNotifyConsumerIdsChangedEnable = subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable();
            int topicSysFlag = 0;
            if (data.isUnitMode()) 
                topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
            
            //尝试创建重试topic
            String newTopic = MixAll.getRetryTopic(data.getGroupName());
            this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
                    newTopic,
                    subscriptionGroupConfig.getRetryQueueNums(),
                    PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
        
        /*
         * 注册consumer,返回consumer信息是否已发生改变
         * 如果发生了改变,Broker会发送NOTIFY_CONSUMER_IDS_CHANGED请求给同组的所有consumer客户端,要求进行重平衡操作
         */
        boolean changed = this.brokerController.getConsumerManager().registerConsumer(
                data.getGroupName(),
                clientChannelInfo,
                data.getConsumeType(),
                data.getMessageModel(),
                data.getConsumeFromWhere(),
                data.getSubscriptionDataSet(),
                isNotifyConsumerIdsChangedEnable
        );

        if (changed) 
            //如果consumer信息发生了改变,打印日志
            log.info("registerConsumer info changed  ",
                    data.toString(),
                    RemotingHelper.parseChannelRemoteAddr(ctx.channel())
            );
        
    
    /*
     * 2 循环遍历处理producerDataSet,即处理producer的心跳信息
     */
    for (ProducerData data : heartbeatData.getProducerDataSet()) 
        /*
         * 注册producer
         */
        this.brokerController.getProducerManager().registerProducer(data.getGroupName(),
                clientChannelInfo);
    
    //返回响应
    response.setCode(ResponseCode.SUCCESS);
    response.setRemark(null);
    return response;

1.3.1.1 registerConsumer注册消费者

注册consumer,返回consumer信息是否已发生改变,如果发生了改变,Broker会发送NOTIFY_CONSUMER_IDS_CHANGED请求给同组的所有consumer客户端,要求进行重平衡操作。

/**
 * ConsumerManager的方法
 * <p>
 * 注册consumer,返回consumer信息是否已发生改变
 * 如果发生了改变,Broker会发送NOTIFY_CONSUMER_IDS_CHANGED请求给同组的所有consumer客户端,要求进行重平衡操作
 *
 * @param group                            消费者组
 * @param clientChannelInfo                客户端连接信息
 * @param consumeType                      消费类型,PULL or PUSH
 * @param messageModel                     消息模式,集群 or 广播
 * @param consumeFromWhere                 启动消费位置
 * @param subList                          订阅信息数据
 * @param isNotifyConsumerIdsChangedEnable 一个consumer改变时是否通知该consumergroup中的所有consumer进行重平衡
 * @return 是否重平衡
 */
public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,
                                ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
                                final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) 
    //获取当前group对应的ConsumerGroupInfo
    ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
    //如果为null,那么新建一个ConsumerGroupInfo并存入consumerTable
    if (null == consumerGroupInfo) 
        ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere);
        ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);
        consumerGroupInfo = prev != null ? prev : tmp;
    
    /*
     * 1 更新连接
     */
    boolean r1 =
            consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,
                    consumeFromWhere);
    /*
     * 2 更新订阅信息
     */
    boolean r2 = consumerGroupInfo.updateSubscription(subList);
    /*
     * 2 如果连接或者订阅信息有更新,并且允许通知,那么通知该consumergroup中的所有consumer进行重平衡
     */
    if (r1 || r2) 
        if (isNotifyConsumerIdsChangedEnable) 
            //CHANGE事件
            this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
        
    
    //注册订阅信息到ConsumerFilterManager
    this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList);

    return r1 || r2;

1.3.1.1.1 updateChannel更新连接

更新此ConsumerGroup组对应的ConsumerGroupInfo的一些属性,并且还会判断当前连接是否是新连接,如果Broker此前没有该连接的信息,那么表示有新的consumer连接到此broker,那么需要通知当前ConsumerGroup的所有consumer进行重平衡。

/**
 * ConsumerGroupInfo的方法
 * <p>
 * 更新连接
 *
 * @param infoNew          新连接信息
 * @param consumeType      消费类型,PULL or PUSH
 * @param messageModel     消息模式,集群 or 广播
 * @param consumeFromWhere 启动消费位置
 * @return 是否通知
 */
public boolean updateChannel(final ClientChannelInfo infoNew, ConsumeType consumeType,
                             MessageModel messageModel, ConsumeFromWhere consumeFromWhere) 
    boolean updated = false;
    //更新信息
    this.consumeType = consumeType;
    this.messageModel = messageModel;
    this.consumeFromWhere = consumeFromWhere;
    //根据当前连接获取channelInfoTable缓存中的连接信息
    ClientChannelInfo infoOld = this.channelInfoTable.get(infoNew.getChannel());
    //如果缓存中的连接信息为null,说明当前连接是一个新连接
    if (null == infoOld) 
        //存入缓存
        ClientChannelInfo prev = this.channelInfoTable.put(infoNew.getChannel(), infoNew);
        //长期按没有该连接信息,那么表示有新的consumer连接到此broekr,那么需要通知
        if (null == prev) 
            log.info("new consumer connected, group:    channel: ", this.groupName, consumeType,
                    messageModel, infoNew.toString());
            updated = true;
        

        infoOld = infoNew;
     else 
        //异常情况
        if (!infoOld.getClientId().equals(infoNew.getClientId())) 
            log.error("[BUG] consumer channel exist in broker, but clientId not equal. GROUP:  OLD:  NEW:  ",
                    this.groupName,
                    infoOld.toString(),
                    infoNew.toString());
            this.channelInfoTable.put(infoNew.getChannel(), infoNew);
        
    
    //更新更新时间
    this.lastUpdateTimestamp = System.currentTimeMillis();
    infoOld.setLastUpdateTimestamp(this.lastUpdateTimestamp);

    return updated;

1.3.1.1.2 updateSubscription更新订阅信息

更新此ConsumerGroup组对应的订阅信息集合,如果存在新增订阅的topic,或者移除了对于某个topic的订阅,那么需要通知当前ConsumerGroup的所有consumer进行重平衡。

该方法的大概步骤为:

  1. 该方法首先遍历当前请求传递的订阅信息集合,然后对于每个订阅的topic从subscriptionTable缓存中尝试获取,如果获取不到则表示新增了topic订阅信息,那么将新增的信息存入subscriptionTable。
  2. 然后遍历subscriptionTable集合,判断每一个topic是否存在于当前请求传递的订阅信息集合中,如果不存在,表示consumer移除了对于该topic的订阅,那么当前topic的订阅信息会从subscriptionTable集合中被移除。

这里的源码实际上很重要,他向我们传达出了什么信息呢?那就是RocketMQ需要保证组内的所有消费者订阅的topic都必须一致,否则就会出现订阅的topic被覆盖的情况。

根据刚才的源码分析,假设一个消费者组groupX里面有两个消费者,A消费者先启动并且订阅topicA,A消费者向broker发送心跳,那么subscriptionTable中消费者组groupX里面仅有topicA的订阅信息。

随后B消费者启动并且订阅topicB,B消费者也向broker发送心跳,那么根据该方法的源码,subscriptionTable中消费者组groupX里面的topicA的订阅信息将会被移除,而topicB的订阅信息会被存入进来。

这样就导致了topic订阅信息的相互覆盖,导致其中一个消费者能够消费消息,而另一个消费者不会消费。

/**
 * ConsumerGroupInfo的方法
 * 更新订阅信息
 *
 * @param subList 订阅信息集合
 */
public boolean updateSubscription(final Set<SubscriptionData> subList) 
    boolean updated = false;
    //遍历订阅信息集合
    for (SubscriptionData sub : subList) 
        //根据订阅的topic在ConsumerGroup的subscriptionTable缓存中此前的订阅信息
        SubscriptionData old = this.subscriptionTable.get(sub.getTopic());
        //如果此前没有关于该topic的订阅信息,那么表示此topic为新增订阅
        if (old == null) 
            //存入subscriptionTable
            SubscriptionData prev = this.subscriptionTable.putIfAbsent(sub.getTopic(), sub);
            //此前没有关于该topic的订阅信息,那么表示此topic为新增订阅,那么需要通知
            if (null == prev) 
                updated = true;
                log.info("subscription changed, add new topic, group:  ",
                        this.groupName,
                        sub.toString());
            
         else if (sub.getSubVersion() > old.getSubVersion()) 
            //更新数据
            if (this.consumeType == ConsumeType.CONSUME_PASSIVELY) 
                log.info("subscription changed, group:  OLD:  NEW: ",
                        this.groupName,
                        old.toString(),
                        sub.toString()
                );
            

            this.subscriptionTable.put(sub.getTopic(), sub);
        
    
    /*
     * 遍历ConsumerGroup的subscriptionTable缓存
     */
    Iterator<Entry<String, SubscriptionData>> it = this.subscriptionTable.entrySet().iterator();
    while (it.hasNext()) 
        Entry<String, SubscriptionData> next = it.next();
        //获取此前订阅的topic
        String oldTopic = next.getKey();

        boolean exist = false;
        //判断当前的subList是否存在该topic的订阅信息
        for (SubscriptionData sub : subList) 
            //如果存在,则退出循环
            if (sub.getTopic().equals(oldTopic)) 
                exist = true;
                break;
            
        
        //当前的subList不存在该topic的订阅信息,说明consumer移除了对于该topic的订阅
        if (!exist) 
            log.warn("subscription changed, group:  remove topic  ",
                    this.groupName,
                    oldTopic,
                    next.getValue().toString()
            );
            //移除数据
            it.remove();
            //那么需要通知
            updated = true;
        
    

    this.lastUpdateTimestamp = System.currentTimeMillis();

    return updated;

1.3.1.1.3 consumerIdsChangeListener.handle监听器通知

该方法通知监听器处理对应的事件,需要进行通知的事件为ConsumerGroupEvent.CHANGE。

可以看到该方法中对于ConsumerGroupEvent.CHANGE事件的处理为:如果允许通知,则遍历该ConsumerGroup的连接集合,然后对每个连接调用notifyConsumerIdsChanged方法通知对应的客户端消费者执行负载均衡。

/**
 * DefaultConsumerIdsChangeListener的方法
 * 
 * 处理监听到的事件
 * @param event 事件
 * @param group 消费者组
 * @param args 参数
 */
@Override
public void handle(ConsumerGroupEvent event, String group, Object... args) 
    if (event == null) 
        return;
    
    switch (event) 
        //改变事件,需要通知该消费者组的每一个消费者
        case CHANGE:
            if (args == null || args.length < 1) 
                return;
            
            //获取参数
            List<Channel> channels = (List<Channel>) args[0];
            //如果允许通知
            if (channels != null && brokerController.getBrokerConfig().isNotifyConsumerIdsChangedEnable()) 
                //遍历连接集合
                for (Channel chl : channels) 
                    //通知该消费者客户端执行负载均衡
                    this.brokerController.getBroker2Client()查看详情  

rocketmq源码(17)—rebalanceservice消费者负载均衡过程源码(代码片段)

基于RocketMQrelease-4.9.3,深入的介绍了消费者负载均衡服务RebalanceService的具体负载均衡过程的源码。上一篇文章我们学习了RocketMQ源码(16)—消费者负载均衡服务RebalanceService入口源码。现在我们将会介绍具体的负载的过程源码... 查看详情

深入剖析rocketmq源码-负载均衡机制(代码片段)

目录一、引言二、RocketMQ的整体架构三、producer消息生产过程3.1路由同步过程3.2 负载均衡过程四、consumer消息消费过程4.1路由同步过程4.2 负载均衡过程五、RocketMQ指定机器消费设计思路六、小结一、引言RocketMQ是一款优秀的分布... 查看详情

源码分析rocketmq系列索引

...分析RocketMQ之消息消费5、源码分析RocketMQ消息消费机制----消费者拉取消息机制6、源码分析RocketMQ消息消费机制----消费端消息负载均衡机制与重新分布7、源码分析RocketMQ消息重试机制8、源码分析RocketMQ消费ACK机制之消息进度9、源... 查看详情

一文讲透rocketmq消费者是如何负载均衡的

...消费:同一Topic下的一条消息只会被同一消费组中的一个消费者消费。也就是说,消息被负载均衡到了同一个消费组的多个消费者实例上。广播消费:当使用广播消费模式时,每条消息推送给集群内所有的消费者,保证消息至少... 查看详情

rocketmq的消费者集群消费消息,实现负载均衡

packagecom.bfxy.rocketmq.model;importjava.util.List;importorg.apache.rocketmq.client.consumer.DefaultMQPushConsumer;importorg.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;importorg.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;importorg.apache.rocketmq.cli... 查看详情

rocketmq源码(18)—defaultmqpushconsumer消费者发起拉取消息请求源码(代码片段)

基于RocketMQrelease-4.9.3,深入的介绍了DefaultMQPushConsumer消费者发起拉取消息请求源码。此前我们学习了DefaultMQPushConsumer负载均衡的源码,同时我们也知道了,最初始的PullRequest,就是在负载均衡之时对于新分配到的... 查看详情

rocketmq消费者消息队列负载均衡

...消息队列负载,获取主题队列信息mqSet与消费组当前所有消费者cidAll,然后按照某一种负载算法进行队列分配,分配原则为同一个消费者可以分配多个消息消息队列,同一个消息消费队列同一时间只会分配给一个消费者。此时,可... 查看详情

rocketmq源码解析-消息消费(代码片段)

RocketMQ源码解析-消息消费1.消费者相关类2.消费者的启动3.消息的拉取4.消费者的负载均衡5.消息的消费6.消费进度管理看了很多遍的代码,还是决定动手把记录下来,梳理一下整体结构和实现细节,给大家一个参考,写的不好的地... 查看详情

8springboot整合rocketmq实现消费者广播模式和负载均衡模式(代码片段)

如上图,假如我们有多个消费者,消息生产者发送的消息,是每一个消费者都消费一次呢?还是通过一些机制,比如轮询机制,每个消息只被某一个消费者消费一次呢?这里涉及到消费者的消费模式... 查看详情

rocketmq源码(18)—defaultmqpushconsumer消费者发起拉取消息请求源码(代码片段)

基于RocketMQrelease-4.9.3,深入的介绍了DefaultMQPushConsumer消费者发起拉取消息请求源码。此前我们学习了DefaultMQPushConsumer负载均衡的源码,同时我们也知道了,最初始的PullRequest,就是在负载均衡之时对于新分配到的... 查看详情

rocketmq(三)——系统架构

...消息。一个生产者组可以同时发送多个主题的消息。消息消费者,负责消费消息。一个消息消费者会从Broker服务器中获取到消息,并对消息进行相关业务处理RocketMQ中的消息消费者都是以消费者组(ConsumerGroup)的形式出现的。消费... 查看详情

rocketmq消息负载均衡策略解析——图解源码级解析(代码片段)

...#x1f34a;Java学习:Java从入门到精通总结🍊深入浅出RocketMQ设计思想:深入浅出RocketMQ设计思想🍊绝对不一样的职场干货:大厂最佳实践经验指南📆最近更新:2022年6月25日🍊个人简介:通信工程... 查看详情

rocketmq

消费流程消费者组:一个逻辑概念,在使用消费者时需要指定一个组名。一个消费者组可以订阅多个Topic。消费者实例:一个消费者组程序部署了多个进程,每个进程都可以称为一个消费者实例。订阅关系:一个消费者组订阅一... 查看详情

ribbon源码之负载均衡算法

IRule  负载均衡器用来选择服务器的规则。publicinterfaceIRule{publicServerchoose(Objectkey);publicvoidsetLoadBalancer(ILoadBalancerlb);publicILoadBalancergetLoadBalancer();}  通过BaseLoadBalancer的setRule或构造函数来为BaseLoadBa 查看详情

springcloud之ribbon负载均衡

...表后,Ribbon就可基于某种负载均衡算法,自动的帮助服务消费者去请求。Ribbon默认为我们提供了很多的负载均衡算法,例如轮询、随机等,也可以实现自定义的负载均衡算法。和Eureka配合实现如下架构:新建一个服务消费者,融... 查看详情

springcloud-远程调用和负载均衡

...高并发的压力,同一个服务可能会启动多个实例。这时候消费者就需要负载均衡,把请求分散到各个实例。负载均衡主要有两种设计:服务端负载均衡客户端负载均衡对于传统的分布式服务来说,大多使用服务端负载均衡。一般... 查看详情

springcloud学习系列之二-----服务消费者(feign)和负载均衡(ribbon)

前言本篇主要介绍的是SpringCloud中的服务消费者(Feign)和负载均衡(Ribbon)功能的实现以及使用Feign结合Ribbon实现负载均衡。SpringCloudFeignFeign介绍Feign是一个声明式的WebService客户端,它使得编写WebSerivce客户端变得更加简单。我们只需... 查看详情