rocketmq源码解析-nameserver篇(代码片段)

_微风轻起 _微风轻起     2022-12-04     473

关键词:

在这一篇我们主要来看下NameServer是怎样保存topicbrokercluster这些信息的。

一、rocketmq-namesrv模块基本介绍

​ 可以看到在它的源码中,namesrv其只有这些类(当然也有使用rocketMQ源码的其他的模块)。NamesrvController完成netty连接的初始化,然后DefaultRequestProcessor负责完成客户端具体要做的操作的派发,而关于注册信息相关的保存以及获取就是RouteInfoManager

二、重要类介绍

1、DefaultRequestProcessor

​ 这个类就是对客户端例如broker注册、producer获取对应broker信息等具体操作的派发

@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
    RemotingCommand request) throws RemotingCommandException 
		...........
    switch (request.getCode()) 
        ..........
        case RequestCode.REGISTER_BROKER:
            Version brokerVersion = MQVersion.value2Version(request.getVersion());
            if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) 
                return this.registerBrokerWithFilterServer(ctx, request);
             else 
                return this.registerBroker(ctx, request);
            
        case RequestCode.UNREGISTER_BROKER:
            return this.unregisterBroker(ctx, request);
        case RequestCode.GET_ROUTEINFO_BY_TOPIC:
            return this.getRouteInfoByTopic(ctx, request);
        case RequestCode.GET_BROKER_CLUSTER_INFO:
            return this.getBrokerClusterInfo(ctx, request);
        case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
            return this.wipeWritePermOfBroker(ctx, request);
        case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:
            return getAllTopicListFromNameserver(ctx, request);
        case RequestCode.DELETE_TOPIC_IN_NAMESRV:
            return deleteTopicInNamesrv(ctx, request);
        case RequestCode.GET_KVLIST_BY_NAMESPACE:
            return this.getKVListByNamespace(ctx, request);
        case RequestCode.GET_TOPICS_BY_CLUSTER:
            return this.getTopicsByCluster(ctx, request);
        case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS:
            return this.getSystemTopicListFromNs(ctx, request);
        case RequestCode.GET_UNIT_TOPIC_LIST:
            return this.getUnitTopicList(ctx, request);
        case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST:
            return this.getHasUnitSubTopicList(ctx, request);
        case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:
            return this.getHasUnitSubUnUnitTopicList(ctx, request);
        case RequestCode.UPDATE_NAMESRV_CONFIG:
            return this.updateConfig(ctx, request);
        case RequestCode.GET_NAMESRV_CONFIG:
            return this.getConfig(ctx, request);
        default:
            break;
    
    return null;

​ 这里就有如RequestCode.REGISTER_BROKER其就是处理broker的注册请求的、RequestCode.GET_TOPICS_BY_CLUSTER客户端获取namesrv保存的topic信息、客户端获取以及注册的broker信息GET_BROKER_CLUSTER_INFOGET_ROUTEINFO_BY_TOPIC获取Topic路由信息等。

1)、registerBroker

​ 这个方法就是处理broker注册请求的

public RemotingCommand registerBroker(ChannelHandlerContext ctx,
    RemotingCommand request) throws RemotingCommandException 
 		.........
    RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(
        requestHeader.getClusterName(),
        requestHeader.getBrokerAddr(),
        requestHeader.getBrokerName(),
        requestHeader.getBrokerId(),
        requestHeader.getHaServerAddr(),
        topicConfigWrapper,
        null,
        ctx.channel()
    );

    responseHeader.setHaServerAddr(result.getHaServerAddr());
    responseHeader.setMasterAddr(result.getMasterAddr());

    byte[] jsonValue = this.namesrvController.getKvConfigManager().getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);
    response.setBody(jsonValue);
    response.setCode(ResponseCode.SUCCESS);
    response.setRemark(null);
    return response;

​ 这里就会交给RouteInfoManager来请求。

2)、getRouteInfoByTopic

public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx,
    RemotingCommand request) throws RemotingCommandException 
    final RemotingCommand response = RemotingCommand.createResponseCommand(null);
    final GetRouteInfoRequestHeader requestHeader =
        (GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);

    TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());

    if (topicRouteData != null) 
        if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) 
            String orderTopicConf =
                this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,
                    requestHeader.getTopic());
            topicRouteData.setOrderTopicConf(orderTopicConf);
        

        byte[] content = topicRouteData.encode();
        response.setBody(content);
        response.setCode(ResponseCode.SUCCESS);
        response.setRemark(null);
        return response;
    

    response.setCode(ResponseCode.TOPIC_NOT_EXIST);
    response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic()
        + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
    return response;

​ 获取TopicRouteData路由信息也是交给RouteInfoManager。下面我们就来具体看下RouteInfoManager

2、RouteInfoManager

public class RouteInfoManager 
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
    private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
    private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
    private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
    private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
    private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

这个类我们主要是要知道这几个成员变量保存的值就可以了,因为关于broker注册、Topic路由信息都是获取这些变量中的值。

1)、前置介绍

​ 在了解这些变量的时候,我有配置3台机的集群关系,建立了两个broker集群:DefaultClusterDefaultCluster-2

DefaultCluster由双主双从构成,其主要构成DefaultCluster集群,broker-xx.properties配置的部分内容

192.168.127.128:broker-a

brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=0

192.168.127.128:broker-b-s

brokerClusterName=DefaultCluster
brokerName=broker-b
brokerId=1

192.168.127.129:broker-a-s

brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=1

192.168.127.129:broker-b

brokerClusterName=DefaultCluster
brokerName=broker-b
brokerId=0

另一个集群我用的一台机192.168.127.130正常不会怎样做(而且我发送消息的时候,发送的消息会在两个broker集群上都会有,要实现集群发消息隔离,应该需要不同集群分别注册到不同的nameServer上)

192.168.127.129:broker-c

brokerClusterName=DefaultCluster-2
brokerName=broker-c
brokerId=0

192.168.127.129:broker-a-s

brokerClusterName=DefaultCluster-2
brokerName=broker-c-s
brokerId=1

​ 下面我们就来看下其具体保存的信息

2)、topicQueueTable(HashMap<String, List<QueueData>>)

public class QueueData implements Comparable<QueueData> 
    private String brokerName;
    private int readQueueNums;
    private int writeQueueNums;
    private int perm;
    private int topicSysFlag;

topicQueueTable就是保存topicbroker的对应关系,例如cluster_queue_topic,这个topic在我们搭建的集群中,其每台broker默认是4个队列,然后发送的消息会分布在3台broker的节点上面–broker-abroker-bbroker-c,也就是说一个topic会有3*4个MessageQueue,例如在producer发送消息的时候,就会在12个MessageQueue中选一个:

​ 同时这个topic的路由信息:

​ 所以我们就可以通过topicQueueTable来知道topic发布在哪些broker上,以及整个topicqueue分布情况。

3)、brokerAddrTable(HashMap<String, BrokerData> )

public class BrokerData implements Comparable<BrokerData> 
    private String cluster;
    private String brokerName;
    private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;

​ 这个就是保存brokerName对应的broker启动的实例的主从实例的分布情况:

例如broker-b所属的集群是DefaultCluster,其有两个实例其中一个主节点(breokerId=0)在192.168.127.129,有一个从节点,这个从节点(brokerId>0)在192.168.127.128

4)、clusterAddrTable(HashMap<String, Set<String>>)

​ 这个就是保存集群有哪些brokerName

5)、brokerLiveTable(HashMap<String, BrokerLiveInfo>)

​ 这个主要是用来维护上次心跳的维持时间

private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
class BrokerLiveInfo 
    private long lastUpdateTimestamp;
    private DataVersion dataVersion;
    private Channel channel;
    private String haServerAddr;

6)、registerBroker(REGISTER_BROKER)

public RegisterBrokerResult registerBroker(final String clusterName,final String brokerAddr,final String brokerName,final long brokerId,final String haServerAddr,final TopicConfigSerializeWrapper topicConfigWrapper,final List<String> filterServerList,final Channel channel) 
    RegisterBrokerResult result = new RegisterBrokerResult();
    try 
        try 
            this.lock.writeLock().lockInterruptibly();
            Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
            if (null == brokerNames) 
                brokerNames = new HashSet<String>();
                this.clusterAddrTable.put(clusterName, brokerNames);
            
            brokerNames.add(brokerName);

            boolean registerFirst = false;

            BrokerData brokerData = this.brokerAddrTable.get(brokerName);
            if (null == brokerData) 
                registerFirst = true;
                brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
                this.brokerAddrTable.put(brokerName, brokerData);
            
        		..........
            String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
            registerFirst = registerFirst || (null == oldAddr);

            if (null != topicConfigWrapper
                && MixAll.MASTER_ID == brokerId) 
                if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
                    || registerFirst) 
                    ConcurrentMap<String, TopicConfig> tcTable =
                        topicConfigWrapper.getTopicConfigTable();
                    if (tcTable != null) 
                        for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) 
                            this.createAndUpdateQueueData(brokerName, entry.getValue());
                        
                    
                
            
            BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
                new BrokerLiveInfo(
                    System.currentTimeMillis(),
                    topicConfigWrapper.getDataVersion(),
                    channel,
                    haServerAddr));
			.........
         finally 
            this.lock.writeLock().unlock();
        
     catch (Exception e) 
        log.error("registerBroker Exception", e);
    
    return result;

private final ReadWriteLock lock = new ReentrantReadWriteLock();
private void createAndUpdateQueueData(final String brokerName, final TopicConfig topicConfig) 
    QueueData queueData = new QueueData();
    queueData.setBrokerName(brokerName);
    queueData.setWriteQueueNums(topicConfig.getWriteQueueNums());
    queueData.setReadQueueNums(topicConfig.getReadQueueNums());
    queueData.setPerm(topicConfig.getPerm());
    queueData.setTopicSysFlag(topicConfig.getTopicSysFlag());

    List<QueueData> queueDataList = this.topicQueueTable.get(topicConfig.getTopicName());
    if (null == queueDataList) 
        queueDataList = new LinkedList<QueueData>();
        queueDataList.add(queueData);
        this.topicQueueTable.put(topicConfig.getTopicName(), queueDataList);
        log.info("new topic registered,  ", topicConfig.getTopicName(), queueData);
     else 
      ..........
    

​ 这里更新的时候首先是上锁,然后就是将客户端实例broker注册的信息构建保存到上面的那些变量中。

7)、pickupTopicRouteData(GET_ROUTEINFO_BY_TOPIC)

public TopicRouteData pickupTopicRouteData(final String topic) 
    TopicRouteData topicRouteData = new TopicRouteData();
    boolean foundQueueData = false;
    boolean foundBrokerData = false;
    Set<String> brokerNameSet = new HashSet<String>();
    List<BrokerData> brokerDataList = new LinkedList<BrokerData>();
    topicRouteData.setBrokerDatas(brokerDataList);

 

rocketmq源码系列nameserver核心源码解析(代码片段)

目录一、NameServer介绍二、NameServer功能列表三、NameServer架构分析四、NameServer工程目录解析五、NameServer启动流程分析1) 创建NameSrvController2) 执行initialize()加载需要的配置3) 启动server六、NameServer核心源码解析1.路由注册1) broker向N... 查看详情

rocketmq源码—nameserver启动流程源码解析(代码片段)

详细介绍了RocketMQ的NameServer启动流程源码解析,包括RocketMQ的RPC通信模型。文章目录0NameServer概述1NamesrvStartup启动入口2createNamesrvController创建NamesrvController2.1newNamesrvController创建控制器3start启动NamesrvController3.1initia 查看详情

7rocketmq源码解析之broker启动(下)(代码片段)

在前面的一篇文章–6、RocketMQ源码解析之Broker启动(上)分析了一下Broker在启动的时候他自身做了哪些事,以及把Broker相关的信息注册到NameServer中去,下面我们就通过Broker把元信息注册到NameServer来了解一下RocketMQ是如何进... 查看详情

rocketmq源码解析-store篇(代码片段)

这一篇我们主要来梳理下`RocketMQ`消息的存储,这一块的逻辑主要是在`rocketmq-store`模块​我们对于这个模块的逻辑梳理主要是借助这些测试类来debug分析主要是MappedFileQueue、MappedFile、CommitLog、MessageStore、ConsumeQueue... 查看详情

5rocketmq源码解析之命名服务启动(代码片段)

在RocketMQ当中,消息发送方以及消息接收方都是配置命名服务(NameServer)的地址。通过命名服务解耦合了消息发送者以及消息接收方,不同于Kafka直接连接Broker地址。命名服务的主要功能包含:Broker管理以及消息的路由... 查看详情

rocketmq源码—broker与nameserver的心跳服务源码(代码片段)

详细介绍了RocketMQ的Broker与NameServer的心跳服务源码,主要包括三部分:Broker发送心跳注册请求源码;NameServer处理心跳注册请求源码;NameServer的心跳检测服务源码;文章目录1Broker发送心跳注册请求1.1发送心跳... 查看详情

rocketmq原理解析-nameserver

Namesrv名称服务,是没有状态可集群横向扩展。1.每个broker启动的时候会向namesrv注册2.Producer发送消息的时候根据topic获取路由到broker的信息3.Consumer根据topic到namesrv获取topic的路由到broker的信息一:Namesrv功能:    接... 查看详情

rocketmq源码解析-store篇(代码片段)

这一篇我们主要来梳理下`RocketMQ`消息的存储,这一块的逻辑主要是在`rocketmq-store`模块​我们对于这个模块的逻辑梳理主要是借助这些测试类来debug分析主要是MappedFileQueue、MappedFile、CommitLog、MessageStore、ConsumeQueue... 查看详情

8rocketmq源码解析之消息发送(代码片段)

...ver以及消息管理Broker。下面我们就可以进行消息发送了。RocketMQ支持三种消息发送方式:同步消息发送(sync):当Producer发送消息到Broker时会同步等待消息处理结果异步消息发送(async):当Producer发送消息到Broker时会指定... 查看详情

8rocketmq源码解析之消息发送(代码片段)

...ver以及消息管理Broker。下面我们就可以进行消息发送了。RocketMQ支持三种消息发送方式:同步消息发送(sync):当Producer发送消息到Broker时会同步等待消息处理结果异步消息发送(async):当Producer发送消息到Broker时会指定... 查看详情

基于源码搭建运行rocketmq主从架构(代码片段)

前言上一篇基于IDEA搭建RocketMQ-4.6源码环境我们搭建并跑通了rocketmq的源码环境.本文我们紧接上文,继续基于源码搭建并运行broker主从架构.1个NameServer节点(与前文一样)2个Broker节点,一个作为Master,一个作为Slave1个Producer生产者(与前... 查看详情

rocketmq原理解析

参考技术ARocketMQ原理解析说明:NameServer是没有状态的,即NameServer中的Broker和topic等状态信息(通过其他角色上报获取)都是保存在内存中的,不会持久化存储(可通过配置实现),集群可以横向扩展。主要功能如下:a.接收Broker... 查看详情

rocketmq源码—producer生产者启动源码一万字(代码片段)

基于RocketMQ4.9.3,详细介绍了RocketMQ的客户端Producer生产者启动的源码。Nameserver和Broker启动之后,RocketMQ就可以使用了。我们先开看看客户端生产者的启动流程源码。源码版本为4.9.3。文章目录1创建DefaultMQProducer实例2start启... 查看详情

rocketmq源码—producer生产者启动源码一万字(代码片段)

详细介绍了RocketMQ的客户端Producer生产者启动的源码。Nameserver和Broker启动之后,RocketMQ就可以使用了。我们先开看看客户端生产者的启动流程源码。源码版本为4.9.3。文章目录1创建DefaultMQProducer实例2start启动生产者2.1getOrCreateMQ... 查看详情

rocketmq源码学习笔记(代码片段)

...境配置1.1、下载源码下载地址:https://github.com/apache/rocketmqgitclonehttps://github.com/apache/rocketmq.git1.2、导入maven工程到IDE1.3、准备启动的配置1)在下载的rocketmq根目录创建新文件夹conf2)把rocketmq\\distribution\\conf下的broker.conf... 查看详情

idea启动rocketmq源码

在github上拉取rocketmq源码之后,尝试启动NameServer。在idea的配置应用程序中指定启动要读取的环境路径key值在org.apache.rocketmq.common.MixAll文件中在mq的运行主目录中创建conf、logs文件夹,conf存放的文件去rocketmq-distribution模块... 查看详情

rocketmq入门到精通—rocketmq学习入门指南|rocketmq服务发现(nameserver)精讲

...源​​经历了6个月的失踪,我将带着干货终究归来!【RocketMQ入门到精通】​​NameServer前提概要RocketMQ中,NameServers被设计用来做简单的路由管理。其职责包括。Brokers定期向每个NameServer注册路由数据(topic以及生产者信息\\消费... 查看详情

(转)rocketmq源码学习--消息存储篇

...tp://www.tuicool.com/articles/umQfMzA1.序言今天来和大家探讨一下RocketMQ在消息存储方面所作出的努力,在介绍RocketMQ的存储模型之前,可以先探讨一下MQ的存储模型选择。2.MQ的存储模型选择个人看来,从MQ的类型来看,存储模型分两种:... 查看详情