关键词:
kafka管控平台推荐使用 滴滴开源 的 Kafka运维管控平台(戳我呀) 更符合国人的操作习惯 、更强大的管控能力 、更高效的问题定位能力 、更便捷的集群运维能力 、更专业的资源治理 、更友好的运维生态 、
BliBli视频: 石臻臻的杂货铺
kafka的动态配置
文章目录
今天这篇文章,给大家分享一下最近看kafka中的动态配置,不需要重启Broker,即时生效的配置 欢迎留言一起探讨!
kafka中的配置
- Broker静态配置
.properties
文件- ZK中的动态配置
全局 default
配置- ZK中动态配置 指定配置
优先级从底到高
源码分析
1. Broker启动加载动态配置
KafkaServer.startup
1.1 启动加载动态配置总流程
1. 动态配置初始化
config.dynamicConfig.initialize(zkClient)
- 构造当前配置文件
currentConfig
, 然后从zk中获取节点/config/brokers/<default>
信息,然后更新配置updateDefaultConfig
; (动态默认配置覆盖静态配置) - 从节点
/config/brokers/当前BrokerId
获取配置, 如果配置中有ConfigType=PASSWORD
的配置(例如ssl.keystore.password
)存在,接着判断 是否存在password.encoder.old.secret
配置,(这个配置是用来加解密ConfigType=PASSWORD
的旧的秘钥),尝试用旧秘钥解密秘钥; 然后将这些配置重新加密回写入/config/brokers/当前BrokerId
; 然后返回配置 (这里主要是动态配置里面有密码类型配置的时候需要做一次解密加密处理) - 将上面得到的配置(password类型修改之后) 更新内存总的配置;优先级 静态配置<动态默认配置<指定动态配置
2. 注册可变更配置监听器
如果有对应的配置变更了,那么相应的监听器就会收到通知去修改自己相应的配置;
config.dynamicConfig.addReconfigurables(this)
DynamicBrokerConfig.addReconfigurables
// .........
def addReconfigurables(kafkaServer: KafkaServer): Unit =
kafkaServer.authorizer match
case Some(authz: Reconfigurable) => addReconfigurable(authz)
case _ =>
addReconfigurable(new DynamicMetricsReporters(kafkaConfig.brokerId, kafkaServer))
addReconfigurable(new DynamicClientQuotaCallback(kafkaConfig.brokerId, kafkaServer))
addBrokerReconfigurable(new DynamicThreadPool(kafkaServer))
if (kafkaServer.logManager.cleaner != null)
addBrokerReconfigurable(kafkaServer.logManager.cleaner)
addBrokerReconfigurable(new DynamicLogConfig(kafkaServer.logManager, kafkaServer))
addBrokerReconfigurable(new DynamicListenerConfig(kafkaServer))
addBrokerReconfigurable(kafkaServer.socketServer)
3. 动态配置启动监听
// Create the config manager. start listening to notifications
dynamicConfigManager = new DynamicConfigManager(zkClient, dynamicConfigHandlers)
dynamicConfigManager.startup()
- 注册节点处理器
change-notification-/config/changes
= stateChangeHandler - 注册节点处理器
/config/changes
= zNodeChildChangeHandler - 获取
/config/changes
所有子节点看看有哪些变更 - 遍历所有节点并截取节点的编号, 判断一下是不是大于上一次执行过变更的节点ID
lastExecutedChange(启动的时候是-1)
- 上个条件满足的话,则执行通知操作;不同entity执行的操作不一样,具体请看下面每个类型
- 更新
lastExecutedChange
- 清除过期的通知节点, 默认过期时间
15 * 60 * 1000(15分钟)
就是删除/config/changes /
下面的过期节点
1. 2 加载Topic动态配置
TopicConfigHandler.processConfigChanges
- 获取节点的
data
数据, 如果获取到了则执行通知流程notificationHandler.processNotification(d)
,处理器是ConfigChangedNotificationHandler
; 它先解析节点的json数据,根据版本信息不同调用不同的处理方法; 下面是version=2的处理方式; - 根据json数据可以得到
entityType
和entityName
; 那么久可以去对应的zk数据里面getData获取数据; 并且将获取到的数据Decode成Properties
对象entityConfig
; - 将key为下图中的属性 隐藏掉; 替换成value: [hidden]
- 调用EntityHandler; 这里是
TopicConfigHandler.processConfigChanges
来进行处理,方法里面再看看流程->
- 从动态配置
entityConfig
里面获取message.format.version
配置消息格式版本号; 如果当前Broker的版本inter.broker.protocol.version
小于message.format.version
配置; 则将message.format.version
配置 排除掉 - 调用
TopicConfigHandler.updateLogConfig
来更新指定Topic的所有TopicPartition的配置,其实是将TP正在加载或初始化的状态标记为没有完成初始化,这将会在后续过程中促成TP重新加载并初始化 - 将动态配置和并覆盖Server的默认配置为新的 newConfig, 然后根据Topic获取对应的Logs对象; 遍历Logs去更新newConfig;并尝试执行
initializeLeaderEpochCache
; (需要注意的是:这里的动态配置不是支持所有的配置参数,请看【kafka运维】Kafka全网最全最详细运维命令合集(精品强烈建议收藏!!!)的附件部分) - 当然特殊配置如
leader.replication.throttled.replicas
,follower.replication.throttled.replicas
这两个限流相关;解析配置之后,然后通过quotaManager.markThrottled/quotaManager.removeThrottle
更新/移除对应的限流分区集合 - 如果动态配置了
unclean.leader.election.enable=true(允许非同步副本选主 )
;那么就会执行TopicUncleanLeaderElectionEnable
方法来让它改变选举策略(前提是当前Broker是Controller角色)
1.3 加载Broker动态配置
BrokerConfigHandler.processConfigChanges
假设我们配置了默认配置; zk里面的节点是<default>
sh bin/kafka-configs.sh --bootstrap-server xxxxx:9090 --alter --entity-type brokers
--entity-default
--add-config log.segment.bytes=88888888
- 从zk节点
/config/changes
里面获取变更节点的json数据.然后去对应的 /config/entityType/entituName获取对应的数据 - 如果是
<default>
节点,说明有配置动态默认配置; 则按照 静态配置<动态默认配置<动态指定配置 的顺序重新加载覆盖一下; 如果 新旧配置有变更(有可能执行了一次命令但是参数并没有变化的情况,修改了个寂寞)的情况下 才会做更新的; 并且 通知到所有的BrokerReconfigurable
; 这个就是上面启动时候 1.1 启动加载动态配置总流程的第2步骤 (注册可变更配置监听器) 注册的; - 如果是指定BrokerId, 则除了上面2重新加载覆盖之外, 相关限流 配置
leader.replication.throttled.rate
、follower.replication.throttled.rate
、replica.alter.log.dirs.io.max.bytes.per.second
都会被更新一下quotaManagers.leader/leader/alterLogDirs.updateQuota
;如果这些配置没有配置的话,则用Long.MaxValue(相当于是不限流)
来更新
2. 查询动态配置 流程 --describe
-
简单检验
-
根据类型查询
entities
; type是topics
就获取所有topic; type是broker|broker-loggers
则查询所有Broker节点 -
遍历
entities
获取配置 ;做些简单校验;然后想Broker发起describeConfigs
请求; 节点策略是LeastLoadedNodeProvider
节点调用方法KafkaApis.handleDescribeConfigsRequest
- 未经授权配置不查询
- 经过授权的配置开始查询 ;
- 当查询的是
topics
时, 去zk节点/confgi/类型/类型名
,获取到动态配置数据之后, 然后将其覆盖本地跟Log相关的静态配置, 完事之后组装一下返回;(1.数据为空过滤2.敏感数据设置value=null; ConfigType=PASSWORD和不知道类型是啥的都是敏感数据 3. 组装所有的同义配置(静态默认配置、本地静态、默认动态配置、指定动态配置、等等多个配置
))
返回的数据类型如下:
-
如果有
broker|broker-loggers
节点, 则在 获取到数据之后 然后指定nodeId节点发起describeBrokerConfigs
请求- 如果查询的是
brokers
- 如果查询的是
broker-loggers
- 如果查询的是
3. 新增/修改/删除/动态配置 的流程
1. 发起请求
- 查询当前的类型配置; 这里的查询 跟上面的
--describe
流程是一样的 - 相关校验;如果有
delete-config
配置, 需要校验一下当前配置有没有;如果没有抛出异常; - 计算出需要变更的配置之后, 发起请求
incrementalAlterConfigs
;如果请求类型是brokers/broker-loggers
则发起请求的接收方是 指定的Broker 节点; 否则就是LeastLoadedNodeProvider
(当前负载最少的节点)
2. incrementalAlterConfigs 增量修改配置
KafkaApis.handleIncrementalAlterConfigsRequest
- 通过请求参数解析 配置 configs
- 过滤一下未授权的配置
- 如果配置中有重复的项则抛出异常
Topic配置
- 获取节点 /config/topics/topicName 中的配置数据;
- 然后根据请求参数的属性 ,组装好变更后的配置是什么样的
configs
; - 简单校验一下, 并且支持自定义校验,如果有
alter.config.policy.class.name=
配置(默认null)的话,则会实例化指定的类(需要继承AlterConfigPolicy
类);并调用他的validate
方法来校验; - 调用写入zk配置的接口, 将动态配置重新写入(SetDataRequest)到接口
/config/topics/topicName
中; - 创建并写入配置变更记录顺序节点
/config/changes/config_change_序列号
中; 这个节点主要是让Broker们来监听这个节点的来了解到哪个配置有变更的;
其他的类型都一样
省略
4. Broker监听/config/changes的变更
在 1. Broker启动加载动态配置 中我们了解到有对节点/config/change
注册一个子节点变更的监听处理器
那么对动态配置做出修改之后, 这个节点就会新增一条数据,那么所有的Broker都会收到这个通知;
所以我们就要来看一看收到通知之后又做了哪些事情
这个流程是又回到了上面的 1. 2 加载Topics/Brokers动态配置 的流程中了;
源码总结
原理部分讲解比较详细的可以看 : Kafka动态配置实现原理解析 - 李志涛 - 博客园
Q&A
如果我想在我的项目中获取kafka的所有配置该怎么办?
- 启动的时候加载一次所有Broker的配置
- 监听节点
/config/change
节点的变化
是否可以直接在zk中写入动态配置?
不可以,因为Broker是监听
/config/changes/
里面的Broker节点,来实时得知有数据变更;
为什么不直接监听 /config/
下面的配置?
没有必要,这样监听的数据数据太多了,而且 你不知道具体是改了哪个配置,所以每次都要全部更新一遍,无缘无故的加重负担了, 用
/config/change
节点来得知哪个类型的数据变更, 只变更这个相关数据就可以了
关于程序国际化你必须要知道这事
银弹谷零代码开发平台V百科|新功能:关于国际化你必须要知道这事小张最近有一个烦恼!他们公司最近接了一个跨国合作的项目,小张因为业绩优秀英语功底扎实,也有多年做项目的经验,所以就被领导派去做项目负责人。但... 查看详情
线程池续:你必须要知道的线程池submit()实现原理之futuretask!(代码片段)
前言上一篇内容写了Java中线程池的实现原理及源码分析,说好的是实实在在的大满足,想通过一篇文章让大家对线程池有个透彻的了解,但是文章写完总觉得还缺点什么?上篇文章只提到线程提交的execute()方法,并没有讲解线程... 查看详情
线程池续:你必须要知道的线程池submit()实现原理之futuretask!(代码片段)
前言上一篇内容写了Java中线程池的实现原理及源码分析,说好的是实实在在的大满足,想通过一篇文章让大家对线程池有个透彻的了解,但是文章写完总觉得还缺点什么?上篇文章只提到线程提交的execute()方法,并没有讲解线程... 查看详情
关于ffmpeg-php你必须要知道的
1#PHPFFmpeg23[![BuildStatus](https://secure.travis-ci.org/PHP-FFMpeg/PHP-FFMpeg.png?branch=master)](http://travis-ci.org/PHP-FFMpeg/PHP-FFMpeg)45[![SensioLabsInsight](https://insight.sensiolabs.com/pr 查看详情
oracle!你必须要知道的knowledgepoints(入门篇)(代码片段)
一、入门oracle有四个用户,分别为sys、system、sysman和scott,其中sys是oracle权限最高的用户,类似于Linux系统的root,scott是实例用户,上课就以这个用户里的三张员工表empno、dept、salgrade作为示例来授课。启动服务1.快捷键ctrl+alt+del打... 查看详情
oracle!你必须要知道的knowledgepoints(下)(代码片段)
子查询什么是子查询当查询中的限制条件需要另一个查询提供时,我们可以把两个查询语句嵌套起来,提供条件的查询语句作为子查询。子查询,也叫内部查询,先于主查询执行,子查询的结果被用于主查询。子查询分为单行子... 查看详情
想要节省空间,你必须要知道——动态内存管理(附通讯录动态内存版源码)(代码片段)
想要节省空间,你必须要知道——动态内存管理(附通讯录动态内存版源码)1. 为什么存在动态内存分配2. 动态内存函数的介绍2.1 malloc2.2 freemalloc和free通常配合一起使用:2.3 calloc2.4 ... 查看详情
10个你必须要知道的重要javascript数组方法(代码片段)
...忙碌的初学者,我选择了10种最常见的数组方法,你必须学习它们,这些可以帮助你提升学习效率,节省时间。为了便于理解,我为每个数组方法提供了一个示例用例。01、Array.map()通过调用回调函数,map()... 查看详情
10个你必须要知道的重要javascript数组方法(代码片段)
...忙碌的初学者,我选择了10种最常见的数组方法,你必须学习它们,这些可以帮助你提升学习效率,节省时间。为了便于理解,我为每个数组方法提供了一个示例用例。01、Array.map()通过调用回调函数,map()... 查看详情
核心篇,你必须要会的dockerfile指令详解(代码片段)
...镜像的方式,其中一种方式就是基于Dockerfile构造镜像关于使用Dockerfile构造镜像的演示案例,请参阅上面的链接二、Dockerfile构建失败时会怎样?如果在构建的过 查看详情
关于flutter列表的性能优化,你必须要了解的(代码片段)
这里是坚果前端小课堂,大家喜欢的话,可以关注我的公众号“坚果前端,”,或者加我好友,获取更多精彩内容嵌套列表-ShrinkWrap与Slivers使用ShrinkWrap的列表列表下面是一些使用ListView对象呈现列表列表的代... 查看详情
深入集合类系列——你必须要知道的两棵继承树
你必须要知晓文件备份同步小技巧之rsync!(代码片段)
一、rsync概述1.1、rsync简介Rsync(remotesynchronize)是一个远程数据同步工具,可通过LAN/WAN快速同步多台主机间的文件。Rsync使用所谓的“Rsync算法”来使本地和远程两个主机之间的文件达到同步,这个算法只传送两个... 查看详情
你必须要知晓文件备份同步小技巧之rsync!(代码片段)
一、rsync概述1.1、rsync简介Rsync(remotesynchronize)是一个远程数据同步工具,可通过LAN/WAN快速同步多台主机间的文件。Rsync使用所谓的“Rsync算法”来使本地和远程两个主机之间的文件达到同步,这个算法只传送两个... 查看详情
你必须要知道的雪崩穿透预热更新降级
以前总觉得,写得一手好Java代码,走遍天下都不怕,后来随着时间得推移,才意识到程序的高效性流畅性才是最重要最重要的。所有的技术都是前人不断实践突破革新留下的产物,它们的存在以及运用一定有... 查看详情
你必须要知道的10款app开发框架
对于大部分Web开发人员,HTML、CSS和Javascript是他们最熟练的开发技能。然而,开发一个原生的移动App,对他们来说却是完全陌生的领域。因为开发Android,iOS或WindowsPhone上的原生App(app开发公司ty300.com),需要掌握完全不同的开发... 查看详情
你必须要会的测试用例基本功(代码片段)
摘要随着软件系统规模的持续增大,业务复杂度的持续增加,软件测试的复杂度也随之越来越大。而软件测试工作复杂度的直接体现就是测试用例编写、维护、执行和管理,所以编写易读、易维护和易管理的测试用例... 查看详情
你必须要会uvloop!让pythonasyncio异步编程性能直逼go协程性能(代码片段)
背景最近我在思考后端优化的事情,了解到了uvloop,它是python原生asyncio事件循环的替代品。先介绍下asyncioPython从3.4开始,引入了asyncio库,参考PEP-3156。Python从3.5开始,引入了async和await语法,参考PEP-0492。Python中的用法其实跟java... 查看详情