关键词:
前言
在 Kafka 3.0 源码笔记(1)-Kafka 服务端的网络通信架构 中笔者介绍了 Kafka 3.0 版本的组件构成,其实由此也可以将本文内容分为三个部分,主要时序如下图所示:
- Kafka 服务端的启动流程
- Kafka 服务端新建连接的处理
- Kafka 服务端请求处理流程
源码分析
1. Kafka 服务端的启动流程
-
Kafka 服务端的启动由
Kafka.scala#main()
方法为入口,可以看到主要步骤如下:- 调用
Kafka.scala#getPropsFromArgs()
方法将启动参数中指定的配置文件加载到内存中 - 调用
Kafka.scala#buildServer()
方法创建 kafka 的服务端实例对象 - 调用创建的服务端实例对象的接口方法
Server.scala#startup()
方法启动服务端
def main(args: Array[String]): Unit = try val serverProps = getPropsFromArgs(args) val server = buildServer(serverProps) try if (!OperatingSystem.IS_WINDOWS && !Java.isIbmJdk) new LoggingSignalHandler().register() catch case e: ReflectiveOperationException => warn("Failed to register optional signal handler that logs a message when the process is terminated " + s"by a signal. Reason for registration failure is: $e", e) // attach shutdown handler to catch terminating signals as well as normal termination Exit.addShutdownHook("kafka-shutdown-hook", try server.shutdown() catch case _: Throwable => fatal("Halting Kafka.") // Calling exit() can lead to deadlock as exit() can be called multiple times. Force exit. Exit.halt(1) ) try server.startup() catch case _: Throwable => // KafkaServer.startup() calls shutdown() in case of exceptions, so we invoke `exit` to set the status code fatal("Exiting Kafka.") Exit.exit(1) server.awaitShutdown() catch case e: Throwable => fatal("Exiting Kafka due to fatal exception", e) Exit.exit(1) Exit.exit(0)
- 调用
-
Kafka.scala#getPropsFromArgs()
方法的核心是调用Utils#loadProps()
加载指定的配置文件,这部分逻辑比较简单,不做深入def getPropsFromArgs(args: Array[String]): Properties = val optionParser = new OptionParser(false) val overrideOpt = optionParser.accepts("override", "Optional property that should override values set in server.properties file") .withRequiredArg() .ofType(classOf[String]) // This is just to make the parameter show up in the help output, we are not actually using this due the // fact that this class ignores the first parameter which is interpreted as positional and mandatory // but would not be mandatory if --version is specified // This is a bit of an ugly crutch till we get a chance to rework the entire command line parsing optionParser.accepts("version", "Print version information and exit.") if (args.length == 0 || args.contains("--help")) CommandLineUtils.printUsageAndDie(optionParser, "USAGE: java [options] %s server.properties [--override property=value]*".format(this.getClass.getCanonicalName.split('$').head)) if (args.contains("--version")) CommandLineUtils.printVersionAndDie() val props = Utils.loadProps(args(0)) if (args.length > 1) val options = optionParser.parse(args.slice(1, args.length): _*) if (options.nonOptionArguments().size() > 0) CommandLineUtils.printUsageAndDie(optionParser, "Found non argument parameters: " + options.nonOptionArguments().toArray.mkString(",")) props ++= CommandLineUtils.parseKeyValueArgs(options.valuesOf(overrideOpt).asScala) props
-
Kafka.scala#buildServer()
方法创建服务端实例对象是非常关键的一步,需要注意的点如下:- 通过
KafkaConfig.scala#fromProps()
方法将加载到内存中的配置转化构建为KafkaConfig
对象 - 调用
KafkaConfig.scala#requiresZookeeper()
方法确定 Kafa 服务端的启动模式。此处主要是通过 process.roles 配置的存在与否来判断,如果这个配置存在则以移除 zk 依赖的KRaft模式
启动,否则以依赖 zk 的旧模式启动 - 本文基于 Kafka 3.0 版本,此版本的 KRaft 支持已经比较稳定,故以 KRaft 模式为例进行分析,此处将创建
KafkaRaftServer
对象
private def buildServer(props: Properties): Server = val config = KafkaConfig.fromProps(props, false) if (config.requiresZookeeper) new KafkaServer( config, Time.SYSTEM, threadNamePrefix = None, enableForwarding = false ) else new KafkaRaftServer( config, Time.SYSTEM, threadNamePrefix = None )
- 通过
-
Scala
的语法与Java
有不少差异,比如 Scala 中构造函数是直接与类声明相关联的,另外KafkaRaftServer
对象的创建动作会触发执行不少关键成员对象的创建,与本文直接相关的如下:broker
:BrokerServer
对象,当节点的配置 process.roles 中指定了broker
角色时才会创建,处理消息数据类请求,例如消息的生产消费等controller
:ControllerServer
对象,当节点的配置 process.roles 中指定了broker
角色时才会创建,处理元数据类请求,包括 topic 创建删除等
class KafkaRaftServer( config: KafkaConfig, time: Time, threadNamePrefix: Option[String] ) extends Server with Logging KafkaMetricsReporter.startReporters(VerifiableProperties(config.originals)) KafkaYammerMetrics.INSTANCE.configure(config.originals) private val (metaProps, offlineDirs) = KafkaRaftServer.initializeLogDirs(config) private val metrics = Server.initializeMetrics( config, time, metaProps.clusterId ) private val controllerQuorumVotersFuture = CompletableFuture.completedFuture( RaftConfig.parseVoterConnections(config.quorumVoters)) private val raftManager = new KafkaRaftManager[ApiMessageAndVersion]( metaProps, config, new MetadataRecordSerde, KafkaRaftServer.MetadataPartition, KafkaRaftServer.MetadataTopicId, time, metrics, threadNamePrefix, controllerQuorumVotersFuture ) private val broker: Option[BrokerServer] = if (config.processRoles.contains(BrokerRole)) Some(new BrokerServer( config, metaProps, raftManager, time, metrics, threadNamePrefix, offlineDirs, controllerQuorumVotersFuture, Server.SUPPORTED_FEATURES )) else None private val controller: Option[ControllerServer] = if (config.processRoles.contains(ControllerRole)) Some(new ControllerServer( metaProps, config, raftManager, time, metrics, threadNamePrefix, controllerQuorumVotersFuture )) else None ...
-
经过以上步骤,Kafka 服务端的
Server
对象创建完毕,最终创建了一个KafkaRaftServer
对象,则在本节步骤1第三步将KafkaRaftServer.scala#startup()
方法启动服务端,可以看到和本文相关的重点如下:controller.foreach(_.startup())
启动节点上可能存在的ControllerServer
,调用其ControllerServer.scala#startup()
方法broker.foreach(_.startup())
启动节点上可能存在的BrokerServer
,调用其BrokerServer.scala#startup()
方法
本文将以
BrokerServer
的启动为例进行分析,其实从网络通信结构的角度来看,BrokerServer
和ControllerServer
几乎是完全一致的override def startup(): Unit = Mx4jLoader.maybeLoad() raftManager.startup() controller.foreach(_.startup()) broker.foreach(_.startup()) AppInfoParser.registerAppInfo(Server.MetricsPrefix, config.brokerId.toString, metrics, time.milliseconds()) info(KafkaBroker.STARTED_MESSAGE)
-
BrokerServer.scala#startup()
方法比较长,其中涉及的关键对象如下。不过去芜存菁,和网络通信相关的重点其实只有两个,分别是SocketServer 底层网络服务器的创建及配置启动
和KafkaRequestHandlerPool 上层请求处理器池的创建启动
kafkaScheduler
:KafkaScheduler
对象,定时任务的线程池metadataCache
:KRaftMetadataCache
对象,集群元数据管理组件clientToControllerChannelManager
:BrokerToControllerChannelManager
对象,broker 到 controller 的连接管理器forwardingManager
:ForwardingManagerImpl
对象,持有clientToControllerChannelManager
对象,负责转发应该由 controller 处理的请求socketServer
:SocketServer
对象,面向底层网络的服务器对象_replicaManager
:ReplicaManager
对象,副本管理器,负责消息的存储读取groupCoordinator
:GroupCoordinator
对象,普通消费者组的协调器,负责辅助完成消费者组内各个消费者消费分区的协调分配dataPlaneRequestProcessor
:KafkaApis
对象,上层的请求处理器,持有底层网络服务器的请求队列socketServer.dataPlaneRequestChannel
,负责从队列中取出请求进行处理dataPlaneRequestHandlerPool
:KafkaRequestHandlerPool
对象,上层的请求处理器线程池
def startup(): Unit = if (!maybeChangeStatus(SHUTDOWN, STARTING)) return try info("Starting broker") /* start scheduler */ kafkaScheduler = new KafkaScheduler(config.backgroundThreads) kafkaScheduler.startup() /* register broker metrics */ _brokerTopicStats = new BrokerTopicStats quotaManagers = QuotaFactory.instantiate(config, metrics, time, threadNamePrefix.getOrElse("")) logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size) metadataCache = MetadataCache.kRaftMetadataCache(config.nodeId) // Create log manager, but don't start it because we need to delay any potential unclean shutdown log recovery // until we catch up on the metadata log and have up-to-date topic and broker configs. logManager = LogManager(config, initialOfflineDirs, metadataCache, kafkaScheduler, time, brokerTopicStats, logDirFailureChannel, keepPartitionMetadataFile = true) // Enable delegation token cache for all SCRAM mechanisms to simplify dynamic update. // This keeps the cache up-to-date if new SCRAM mechanisms are enabled dynamically. tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames) credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache) val controllerNodes = RaftConfig.voterConnectionsToNodes(controllerQuorumVotersFuture.get()).asScala val controllerNodeProvider = RaftControllerNodeProvider(raftManager, config, controllerNodes) clientToControllerChannelManager = BrokerToControllerChannelManager( controllerNodeProvider, time, metrics, config, channelName = "forwarding", threadNamePrefix, retryTimeoutMs = 60000 ) clientToControllerChannelManager.start() forwardingManager = new ForwardingManagerImpl(clientToControllerChannelManager) val apiVersionManager = ApiVersionManager( ListenerType.BROKER, config, Some(forwardingManager), brokerFeatures, featureCache ) // Create and start the socket server acceptor threads so that the bound port is known. // Delay starting processors until the end of the initialization sequence to ensure // that credentials have been loaded before processing authentications. socketServer = new SocketServer(config, metrics, time, credentialProvider, apiVersionManager) socketServer.startup(startProcessingRequests = false) clientQuotaMetadataManager = new ClientQuotaMetadataManager(quotaManagers, socketServer.connectionQuotas) val alterIsrChannelManager = BrokerToControllerChannelManager( controllerNodeProvider, time, metrics, config, channelName = "alterIsr", threadNamePrefix, retryTimeoutMs = Long.MaxValue ) alterIsrManager = new DefaultAlterIsrManager( controllerChannelManager = alterIsrChannelManager, scheduler = kafkaScheduler, time = time, brokerId = config.nodeId, brokerEpochSupplier = () => lifecycleManager.brokerEpoch ) alterIsrManager.start() this._replicaManager = new ReplicaManager(config, metrics, time, None, kafkaScheduler, logManager, isShuttingDown, quotaManagers, brokerTopicStats, metadataCache, logDirFailureChannel, alterIsrManager, threadNamePrefix) /* start token manager */ if (config.tokenAuthEnabled) throw new UnsupportedOperationException("Delegation tokens are not supported") tokenManager = new DelegationTokenManager(config, tokenCache, time , null) tokenManager.startup() // does nothing, we just need a token manager in order to compile right now... // Create group coordinator, but don't start it until we've started replica manager. // Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue groupCoordinator = GroupCoordinator(config, replicaManager, Time.SYSTEM, metrics) val producerIdManagerSupplier = () => ProducerIdManager.rpc( config.brokerId, brokerEpochSupplier = () => lifecycleManager.brokerEpoch, clientToControllerChannelManager, config.requestTimeoutMs ) // Create transaction coordinator, but don't start it until we've started replica manager. // Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue transactionCoordinator = TransactionCoordinator(config, replicaManager, new KafkaScheduler(threads = 1, threadNamePrefix = "transaction-log-manager-"), producerIdManagerSupplier, metrics, metadataCache, Time.SYSTEM) autoTopicCreationManager = new DefaultAutoTopicCreationManager( config, Some(clientToControllerChannelManager), None, None, groupCoordinator, transactionCoordinator) /* Add all reconfigurables for config change notification before starting the metadata listener */ config.dynamicConfig.addReconfigurables(this) dynamicConfigHandlers = Map[String, ConfigHandler]( ConfigType.Topic -> new TopicConfigHandler(logManager, config, quotaManagers, None), ConfigType.Broker -> new BrokerConfigHandler(config, quotaManagers)) if (!config.processRoles.contains(ControllerRole)) // If no controller is defined, we rely on the broker to generate snapshots. metadataSnapshotter = Some(new BrokerMetadataSnapshotter( config.nodeId, time, threadNamePrefix, new BrokerSnapshotWriterBuilder(raftManager.client) )) metadataListener = new BrokerMetadataListener(config.nodeId, time, threadNamePrefix, config.metadataSnapshotMaxNewRecordBytes, metadataSnapshotter) val networkListeners = new ListenerCollection() config.advertisedListeners.foreach ep => networkListeners.add(new Listener(). setHost(if (Utils.isBlank(ep.host)) InetAddress.getLocalHost.getCanonicalHostName else ep.host). setName(ep.listenerName.value()). setPort(if (ep.port == 0) socketServer.boundPort(ep.listenerName) else ep.port). setSecurityProtocol(ep.securityProtocol.id)) lifecycleManager.start(() => metadataListener.highestMetadataOffset(), BrokerToControllerChannelManager(controllerNodeProvider, time, metrics, config, "heartbeat", threadNamePrefix, config.brokerSessionTimeoutMs.toLong), metaProps.clusterId, networkListeners, supportedFeatures) // Register a listener with the Raft layer to receive metadata event notifications raftManager.register(metadataListener) val endpoints = new util.ArrayList[Endpoint](networkListeners.size()) var interBrokerListener: Endpoint = null networkListeners.iterator().forEachRemaining(listener => val endPoint = new Endpoint(listener.name(), SecurityProtocol.forId(listener.securityProtocol()), listener.host(), listener.port()) endpoints.add(endPoint) if (listener.name().equals(config.interBrokerListenerName.value())) interBrokerListener = endPoint ) if (interBrokerListener == null) throw new RuntimeException("Unable to find inter-broker listener " + config.interBrokerListenerName.value() + ". Found listener(s): " + endpoints.asScala.map(ep => ep.listenerName().orElse("(none)")).mkString(", ")) val authorizerInfo = ServerInfo(new ClusterResource(clusterId), config.nodeId, endpoints, interBrokerListener) /* Get the authorizer and initialize it if one is specified.*/ authorizer = config.authorizer authorizer.foreach(_.configure(config.originals)) val authorizerFutures: Map[Endpoint, CompletableFuture[Void]] = authorizer match case Some(authZ) => authZ.start(authorizerInfo).asScala.map case (ep, cs) => ep -> cs.toCompletableFuture case None => authorizerInfo.endpoints.asScala.map ep => ep -> CompletableFuture.completedFuture[Void](null) .toMap val fetchManager = new FetchManager(Time.SYSTEM, new FetchSessionCache(config.maxIncrementalFetchSessionCacheSlots, KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS)) // Create the request processor objects. val raftSupport = RaftSupport(forwardingManager, metadataCache) dataPlaneRequestProcessor = new KafkaApis(socketServer.dataPlaneRequestChannel, raftSupport, replicaManager, groupCoordinator, transactionCoordinator, autoTopicCreationManager, config.nodeId, config, metadataCache, metadataCache, metrics, authorizer, quotaManagers, fetchManager, brokerTopicStats, clusterId, time, tokenManager, apiVersionManager) dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.nodeId, socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time, config.numIoThreads, s"$SocketServer.DataPlaneMetricPrefixRequestHandlerAvgIdlePercent", SocketServer.DataPlaneThreadPrefix) if (socketServer.controlPlaneRequestChannelOpt.isDefined) kafka源码解析:server端的运行过程
摘要:Kafka网络模块之Server端,介绍Server端启动、接收请求和处理请求的过程。本文分享自华为云社区《Kafka网络模块-Server端》,原文作者:中间件小哥。SocketServer是Kafkaserver端用于处理请求的模块,在Kafka启动过程创建、初始化... 查看详情
kafka3.0需要关注哪些?
ApacheKafka3.0是一个大版本,其引入了各种新功能、API发生重大变化以及对KRaft的改进——ApacheKafka的内置共识机制将取代ApacheZooKeeper™。 虽然KRaft还不推荐在生产中使用,但我们对KRaft元数据和API进... 查看详情
再见了kafka2.0时代,去掉了zk的kafka3.0才是时代新王!
...个问题,反向来看,估计很少有人会选择Kafka。而kafka3.0的诞生彻底扭转了战局!虽然凭借着高吞吐、低延迟的特性Kafka深受广大企业喜爱,但多年来由于zookeeper的“扶持性”限制,Kaf 查看详情
kafka3.0重磅发布,都更新了些啥?
点击上方“朱小厮的博客”,选择“设为星标”后台回复"书",获取后台回复“k8s”,可领取k8s资料ApacheKafka是一个分布式开源流平台,被广泛应用于各大互联网公司。Kafka设计之初被用于消息队列,自201... 查看详情
大数据场景下的消息队列:kafka3.0快速入门(代码片段)
(一)什么是KafkaKafka是一个分布式的基于发布/订阅模式的消息队列,同时它又是一个分布式的事件流平台。既可作为消息队列,又可作为数据管道、流分析的应用。目前Kafka的最大应用还是消息队列。市面上主流... 查看详情
大数据场景下的消息队列:kafka3.0快速入门(代码片段)
(一)什么是KafkaKafka是一个分布式的基于发布/订阅模式的消息队列,同时它又是一个分布式的事件流平台。既可作为消息队列,又可作为数据管道、流分析的应用。目前Kafka的最大应用还是消息队列。市面上主流... 查看详情
大数据场景下的消息队列:kafka3.0快速入门(代码片段)
(一)什么是KafkaKafka是一个分布式的基于发布/订阅模式的消息队列,同时它又是一个分布式的事件流平台。既可作为消息队列,又可作为数据管道、流分析的应用。目前Kafka的最大应用还是消息队列。市面上主流... 查看详情
聊聊kafka:协调者groupcoordinator源码剖析之find_coordinator(代码片段)
四、ApiKeys.FIND_COORDINATOR我们前面的文章说过,与消费组相关的两个组件,一个是消费者客户端的ConsumerCoordinator,一个是KafkaBroker服务端的GroupCoordinator。ConsumerCoordinator负责与GroupCoordinator通信,Broker启动的时候, 查看详情
聊聊kafka:协调者groupcoordinator源码剖析之find_coordinator(代码片段)
四、ApiKeys.FIND_COORDINATOR我们前面的文章说过,与消费组相关的两个组件,一个是消费者客户端的ConsumerCoordinator,一个是KafkaBroker服务端的GroupCoordinator。ConsumerCoordinator负责与GroupCoordinator通信,Broker启动的时候, 查看详情
弃用java8,摆脱zookeeper,kafka3太惊艳了!
...个问题,反向来看,估计很少有人会选择Kafka。而kafka3.0的诞生彻底扭转了战局!虽然凭借着高吞吐、低延迟的特性Kafka深受广大企业喜爱,但多年来由于zookeeper的“扶持性”限制,Kaf 查看详情
聊聊kafka:协调者groupcoordinator源码剖析之实例化与启动groupcoordinator(代码片段)
...a系列专栏中,我们前面讲了一篇聊聊Kafka:Consumer源码解析之Consumer如何加入ConsumerGroup,其实那一篇主要讲的是客户端Consumer加入组请求、加入组响应、同步组请求、同步组响应等操作,我们这一篇主要来讲服务端... 查看详情
聊聊kafka:协调者groupcoordinator源码剖析之实例化与启动groupcoordinator(代码片段)
...a系列专栏中,我们前面讲了一篇聊聊Kafka:Consumer源码解析之Consumer如何加入ConsumerGroup,其实那一篇主要讲的是客户端Consumer加入组请求、加入组响应、同步组请求、同步组响应等操作,我们这一篇主要来讲服务端... 查看详情
聊聊kafka:协调者groupcoordinator源码剖析之find_coordinator(代码片段)
...就交给服务端的ApiKeys.FIND_COORDINATOR命令来处理。4.1客户端源码分析coordinator即获取到的group节点对象,client.isUnavailable(coordinator)是在与group建立连接,每次判断coordinator不为空且client与group连接失败,则将coordinator置空... 查看详情
如何开始调试kafka源码(代码片段)
...服务来发送Kafka事务消息ControllerServiceKafkaListener参考链接源码系列第1弹|带你快速攻略Kafka源码之旅入门篇win10快速搭建scala环境zookeeper源码运行环境搭建Zookeeper源码启动下载地址ZookeeperGitHub仓库KafkaGitHub仓库scalaGitHub版本包下载scal... 查看详情
kafka学习笔记(代码片段)
安装kafka下载下载window的kafka地址window的kafka只是为了方便学习安装地址:kafka.apache.org/安装解压zip为文件夹启动kafkakafka服务器的功能相当于RocketMQ中的broker,kafka运行还需要一个类似于命名服务器的服务。在kafka安装目录... 查看详情
netty源码笔记
...tty版本4.0.29.Final,以构造客户端连接服务端的角度来追踪源码NioEventLoopGroup的构造器中会调用父类MultithreadEventLoopGroup的构造器在父类MultithreadEventExecutorGroup的构造器中上面已经完成了NioEventLoop的创建,并保存在NioEventLoopGroup的数... 查看详情
聊聊kafka:consumer源码解析之consumer如何加入consumergroup(代码片段)
一、前言今天这一篇我们来说一下Consumer是如何加入ConsumerGroup的,我们前面有一篇Kafka的架构文章有说到,Consumer有消费组(ConsumerGroup)的概念,而Producer没有生产组的概念。所以说Consumer侧会比Producer侧复杂点... 查看详情
聊聊kafka:consumer源码解析之consumer如何加入consumergroup(代码片段)
一、前言今天这一篇我们来说一下Consumer是如何加入ConsumerGroup的,我们前面有一篇Kafka的架构文章有说到,Consumer有消费组(ConsumerGroup)的概念,而Producer没有生产组的概念。所以说Consumer侧会比Producer侧复杂点... 查看详情