(2021年4月20日)kafka详解(代码片段)

Mr.DreamerZ Mr.DreamerZ     2022-12-06     283

关键词:

kafka详解

kafka是一个分布式的发布/订阅的消息队列

1.kafka的核心组件


Producer:消息生产者,产生的消息会被发送到某个topic。主动将消息发给broker。
Consumer:消息消费者,消费的消息内容来自于某个topic。主动向topic拉取消息(因为消费水平不同,不能指定推给consumer)。
Topic:消息根据topic进行归类,topic本质上是一个目录,将同一主题消息归类到同一个目录。
Broker:每台kafka服务器节点就是一个broker,一个broker可以有多个topic。
Zookeeper:zookeeper集群不属于kafka内的组件,但kafka依赖zookeeper集群保存meta信息。以及leader的选举和follower的同步

2.kafka数据处理步骤

1.producer产生消息,发送到broker中。
2.leader状态的broker接收消息,写入到相应的topic。
3.leader状态的broker接收完毕以后,传给follow状态的broker作为副本备份。
4.consumer消费broker中的消息。

3.kafka名词解释和工作方式

Producer
消息生产者,就是像broker发送消息的客户端

Consumer
消息消费者,就是像broker拉去消息的客户端

Topic
本质上是一个目录

Consumer Group(CG)
消费者组,这个是kafka用来实现topic消息的广播(发送给所有的consumer)和单播(发给任意一个consumer)的手段。一个topic可以有多个CG。topic的消息会复制(不是真的复制,是概念上的)到所有的CG,但每个partition只会把消息发给该CG中的一个consumer。如果需要实现广播,只需要让每个consumer有一个独立的CG就可以。要实现单播只要所有的consumer在同一个CG中即可。

Broker
一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic。

Partition
为了实现扩展性,一个非常大的topic可以分布到多个broker(服务器)上,一个topic可以分为多个partition。每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。kafka只需要安一个partition中的顺序将消息发给consumer,不保证一个topic整体的顺序(多个partition之间)。

4.kafka和topic的关系

kafka中消息是以topic进行分类的,生产者生产消息,消费者消费消息,都是面向topic的。
topic是逻辑上的概念,而partition是物理上的概念,每个partition对应于一个log文件,该log文件中存储的就是producer生产的数据。producer生产的数据会被不断追加到该log文件的末尾,且每一条数据都有它自己的offset。消费组中的每个消费者,都会实时记录自己消费到哪个offset,以便出错恢复时,从上次的位置继续消费。

由于生产者生产的消息会不断追加到log文件末尾,为防止log文件过大导致数据定位效率低下,kafka采取了分片和索引机制,将每个partition分为多个segment。每个segment对应两个文件——".index"文件和".log"文件。这些文件位于一个文件夹下,该文件夹的命名规则为:topic名称+分区序号。例如:first这个topic有三个分区,则对应的文件夹名称为first-0,first-1,first-2。

index和log文件以当前segment的第一条消息的offset命名。

“.index"文件存储大量的索引信息,”.log"文件存储大量的数据

5.Consumer和topic的关系

kafka只支持topic

每个group中可以有多个consumer,每个consumer属于一个consumer group。通常情况下,一个group中会包含多个consumer,这个不仅可以提高topic中消息的并发消费能力,而且还能提高“故障容错”性,如果group中的某个consumer失效,那么其消费的partition将会有其他consumer自动接管。

对于topic中的一条特定的消息,只会被订阅此topic中的某一个group中的其中一个consumer消费,此消息不会发送给一个group中的多个consumer;

在kafka中,一个partition中的消息只会被group中的一个consumer消费(同一时刻);一个topic中的每个partition,只会被一个group中的一个consumer消费,但是一个consumer可以同时消费多个partition中的消息。

kafka只能保证一个partition中的消息被某个consumer消费时是顺序的;事实上,从topic角度来说,当有多个partition时,消费仍然不是全局有序的。

注意:同一topic中partition的Leader和Follower不会在同一个broker中。


消费组和分区的关系





如部分上图所示,可以看出,一个主题中的分区必须都被消费。但是一个topic中的分区最多只能被消费者群组中的一个消费者消费

6.kafka消息的发布

Producer客户端负责消息的发布

kafka集群中的任何一个broker都可以向producer提供metadata信息,这些metadata中包含“集中存活的servers列表”、“partitions leader列表”等信息;

当producer获取到metadata信息之后,producer将会和topic下所有的partition leader保持socket连接;

消息由producer直接通过scoket发送到broker,中间不会经过任何"路由层"。事实上,消息被路由到哪个partition上由producer客户端决定,比如采用"随机",“轮训”,"key-hash"等。

如果一个topic中有个多个partition,那么在producer端实现"消息均衡分发"是必要的。

在producer端的配置文件中,开发者可以指定partition的路由方式。

producer消息发送的应答机制
设置发送数据是否需要服务端的反馈,有三个值0,1,-1
0:producer不会等待broker发送ack,producer发送之后就不管了;
1:当leader接收到消息之后,等待leader落盘成功后,发送ack;
-1:等待leader和follower都落盘成功后,发送ack;此方式不会丢失消息,但是可能会导致消费者重复消费;

request.required.acks=0

这里说明一下,很多面试都会问到避免消息丢失和重复消费。
先说消息丢失,这个通常是在生产者方面丢消息给broker。此时ack设置为-1,会让消息落盘到leader和follower。之后相应ack。
再来说下重复消费:
生产者方面
重复消费其实在生产者方面就会出现,比如出现异常但是offset没有及时修改之类的。此时我们可以在producer方引入幂等性操作,幂等性操作提供了PID(producerID)和sequenceNum ID(决定topic和partition)
ProducerID:在每个新的Producer初始化时,会被分配一个唯一的ProducerID,这个ProducerID对客户端使用者是不可见的。
SequenceNumber:对于每个ProducerID,Producer发送数据的每个Topic和Partition都对应一个从0开始单调递增的SequenceNumber值。

enable.idempotence=true

消费者方面:
手动提交偏移量(可以解决,但是不建议)
如果在生产者方面没有做幂等操作,那么就需要在消费时创建一个表来进行重复排查。

7.Conusmer的负载均衡

当一个group中,有consumer加入或者离开时,会触发partition均衡,均衡的最终目的是提高topic的并发消费能力,步骤:
1.加入topic1,具有partition:P0,P1,P2,P3
2.加入groupA,有如下consumer:C0,C1
3.首先根据partition索引号对partition排序:P0,P1,P2,P3
4.根据consumer.id排序:C0,C1
5.计算倍数: M = [P0,P1,P2,P3].size / [C0,C1].size,本例值M=2(向上取整)
6.然后依次分配partitions: C0 = [P0,P1],C1=[P2,P3],即Ci = [P(i * M),P((i + 1) * M -1)]

8.副本数据同步

8.1 同步策略


第二种情况,当ack=-1时,如果所有leader和follower都落盘之后再返回ack。那么会出现该问题:leader收到数据,所有follower都开始同步数据,但有一个follower,因为某种故障,迟迟不能与leader进行同步,那么leader就要一直等待下去。那么我们该怎么办呢?
Leader维护一个动态的in-sync relica set(ISR),意为和leader保持同步的follower集合。当ISR中的follower完成数据的同步之后,leader就会给follower发送ack。如果follower长时间未向leader同步数据,则将follower踢出ISR。该时间阈值由replica.lag.time.max.ms参数设定。Leader发生故障之后,就会ISR中选举新的leader。
我们可以想想为什么要用时间来来进行判断呢,而不能用leader和follower同步之后相差的条数来进行判定踢出的条件呢?这样不是更方便吗?
试想一下,如果由条数作为条件。那么如果follower向leader拉取条数时条数拉取的较少,而导致follower同步的时候就达到了leader和follower的阈值条件,导致ISR直接丢掉。

ISR : 速率和leader相差低于10秒的follower的集合
OSR : 速率和leader相差大于10秒的follower/集合
AR : 所有分区的follower,AR=ISR+OSR

8.2 故障处理细节


follower故障:
follower发生故障之后会被踢出ISR,待该follower恢复后,follower会读取本地磁盘记录的上次的HW,并将log文件高于HW的部分全部截取,从HW开始向leader进行同步。等待follower的LEO大于等于该partition的HW,即follower追上leader之后,就可以重新加入ISR了。
注意:截取该ISRHW后面部分,丢掉(为了避免错误)。由于之前已经读取了HW,所以只需要向leader的HW开始同步即可。

leader故障:
leader发生故障之后,会从ISR中选出一个新的leader,之后,为保证多个副本之间的数据一致性,其余的follower会先将各自的log文件高于HW的部分截掉,然后从新的leader同步数据。
注意:截取HW后面部分,丢掉。然后从新的leader的HW开始同步。

LEO:指的是每个副本最大的offset。
HW:指的是消费者能见到的最大的offset,ISR中最小的LEO

9.zookeeper在kafka中的作用

kafka集群中有一个broker会被选举为controller,负责管理集群broker上下线,所有topic的分区副本分配和leader选举等工作。
而Controller的管理工作都是依赖于zookeeper。

10.消费发送方式

1.异步发送(默认)
2.同步发送
同步发送的意思就是,一条消息发送之后,会阻塞当前线程,直至返回ack。
由于send方法返回的是一个Future对象,根据Future对象的特点,我们也可以实现同步发送的效果,只需要在调用Future对象的get方法即可。

11.消息提交方式

11.1 自动提交

enable.auto.commit:true
注意,如果为false,说明我们不需要由kafka自动进行提交偏移量。因此auto.commit.interval.ms(延时提交)也会失效。

11.2 手动提交

手动提交offset的方法有两种:分别是commitSync(同步提交)和commitAsync(异步提交)。两者的相同点是,都会将本次poll的一批数据最高的偏移量提交;不同点是:commitSync阻塞当前线程,一直到提交成功,并且失败会重试;而commitSync则没有重试机制,故有可能提交失败。
其实无论自动提交或者手动提交(同步提交还是异步提交)offset,都可能造成数据的漏消费或者重复消费。先提交offset后消费,有可能造成数据的漏消费;而先消费后提交offset,有可能会造成数据重复消费。
所以,一般比较手动提交保险的办法就是使用异步+同步的方式进行提交。

12.kafka为什么这么快

1.partition 并行处理
2.顺序写磁盘,充分利用磁盘特性
3.利用了现代操作系统分页存储 Page Cache 来利用内存提高 I/O 效率
4.采用了零拷贝技术
5.Producer 生产的数据持久化到 broker,采用 mmap 文件映射,实现顺序的快速写入
6.Customer 从 broker 读取数据,采用 sendfile,将磁盘文件读到 OS 内核缓冲区后,转到 NIO buffer进行网络发送,减少 CPU 消耗

对应面试题

2020年4月21日上机实习(代码片段)

程序源码:programmainimplicitnoneintegeri,sumrealt(7),tmaxtmax=0.0;sum=0doi=1,7read*,t(i)if(t(i)>=tmax)thentmax=t(i)endifif(t(i)>20)thensum=sum+1endifenddoprint*,‘日最高气温的最大值为:‘print*,‘tmax=‘,tmaxprint*,‘日最高气温所在的日期为:‘doi=1,7if(t(i)==... 查看详情

vbscript有用的宏观:[2015年1月20日](代码片段)

查看详情

text芝加哥代码2018年4月4日更新(代码片段)

查看详情

买方团延长和利时股东提交同意的截止日期至2021年8月20日

...#xff0c;以供和利时自动化科技有限公司(纳斯达克证券代码:HOLI)(“公司”& 查看详情

实验报告(2019年4月31日)(代码片段)

c程序实验报告姓名:黄志乾????实验地点:教学楼514教室????实验时间:4月30日实验项目:???1、编写由三角形三边求面积的函数???2、编写求N阶乘的函数???3、求两个整数的最大公约数???4、打印输出指定图形???5、求500以内所有亲密... 查看详情

leetcode周赛2021年5月9日(代码片段)

太久不写代码了第一次有空做LeetCode周赛不是下标越界就是暴力超时或者是明明很简单的题目就是懒得写比较麻烦就不动手看别人的分享学习一下第一题活着的人最多的年份classSolutionpublic:intyr[3000];intmaximumPopulation(vector<vector<... 查看详情

casestudy-20180913-kafka进程挂掉&解决办法

...18年xx月xx日下午4点20分左右xxx无意中看到xxx正在排查线上Kafka集群遇到的问题,随后问明情况,有一台机器上Kafka进程挂了,当时他正在lark平台上查看错误日志信息,随后我一起加入排查问题。事故起止时间:2018年xx月xx日16时30... 查看详情

游戏引擎开发日志(第四天2021年6月8日)(代码片段)

第四天2021年6月8日上一天(第三天)的地址:https://blog.csdn.net/z736248591/article/details/117266221最近有点忙,快到期末了,很多课程都要结课,作业巨多。但是时间是挤出来的。继续之前的交换链函数的完成。... 查看详情

2018年4月14日笔记(代码片段)

函数关键字def  函数声明return  返回值pass  略过,啥也不干exit(1)  直接退出 高阶函数:可接收另一个函数作为参数的函数常用的高阶函数有:map()reduce()filter()sorted()  ->最常用 map()函数:接收两个参数,一... 查看详情

如何更改日期字符串格式(2052 年 10 月 20 日 -> 2052-10-20)

】如何更改日期字符串格式(2052年10月20日->2052-10-20)【英文标题】:HowtochangeDateStringFormat(20thOct2052->2052-10-20)【发布时间】:2019-06-1523:25:54【问题描述】:块引用我有一个格式为日月年的日期字符串,例如1984年3月1日、1973... 查看详情

实验报告(2019年4月30日)下半部分(代码片段)

c程序实验报告姓名:黄志乾????实验地点:教学楼514教室????实验时间:4月30日实验项目:???1、利用复化梯形公式计算定积分???2、计算Ackerman函数???3、编写计算x的y次幂的递归函数getpower(intx,inty)。并在主程序中实现输入输出。???4... 查看详情

csp2020儒略日题解(代码片段)

题目大意:求公元前4713年1月1日经过r天后的日期,公元1582年10月4日以前适用儒略历,公元1582年10月15日以后适用格里高利历?q次询问,(qleq10^5)这题就我目前所知有三种做法:做法一大概就是先把儒略历和格里高利历的分界点判... 查看详情

2018年4月26日笔记(代码片段)

内置模块:hashlibPython的hashlib提供了常见的摘要算法,如md5,sha1,sha224,sha256,sha384,sha512等等,其中md5最为常用。什么是摘要算法呢?摘要算法又称哈希算法、散列算法。它通过一个函数,把任意长度的数据转换为一个长度固定的... 查看详情

一文详解自动驾驶的运行设计域(odd)|自动驾驶系列

文章版权所有,未经授权请勿转载或使用 相关标准全文下载,关注本公众号回复“210901”即可 2021年4月30日,SAE发布了第四版J3016《驾驶自动化分级》,这是即2014年1月16日、2016年9月30日、2018年6月15日之后,J... 查看详情

2018年4月28日笔记(代码片段)

正则表达式  数量词的贪婪模式与非贪婪模式Python中数量词默认是贪婪的,总是尝试匹配尽可能多的字符例如,正则表达式"ab*"如果用于查找"abbbc",将匹配到"abbb";如果是非贪婪方式,则会匹配到"a"注意:+或*后跟?表示... 查看详情

meetupno.1|starrocks技术详解

MeetupNo.1StarRocks技术详解时间:2021年9月16日,周四19:00-20:0019:00-20:00StarRocks技术详解 赵纯,StarRocks CTO  内容简介:StarRocks9月8日宣布开放源代码,不知道你是否已经将代码撸过一遍了呢?本次StarRocksCTO将... 查看详情

meetupno.1|starrocks技术详解

MeetupNo.1StarRocks技术详解时间:2021年9月16日,周四19:00-20:0019:00-20:00StarRocks技术详解 赵纯,StarRocks CTO  内容简介:StarRocks9月8日宣布开放源代码,不知道你是否已经将代码撸过一遍了呢?本次StarRocksCTO将... 查看详情

[转帖]chrome浏览器历史版本(代码片段)

 微软马上就终止支持win7了但是据说google还要继续支持18个月.现在chrome的版本时78估计chrome在win7上面最终支持的版本回事chrome90如下面试winxpgoogle与2016年4月份停止支持时候chrome49的版本.https://www.cnblogs.com/snailrunning/p/9136672.html&n... 查看详情