消息队列之kafka从入门到小牛(代码片段)

AmoXiang AmoXiang     2022-12-18     135

关键词:

一、Kafka 介绍

1.1 消息队列的介绍

在没有使用消息系统以前,我们对于许多的传统业务,以及跨服务器传递消息的时候,会采用 串行方式或者并行方法; 串行方式如下:用户注册实例:将注册信息写入数据库成功后,发送注册邮件,再发送注册短信。

并行方式:将注册信息写入数据库成功后,发送注册邮件的同时,发送注册短信。

以上三个任务完成之后,响应给客户端,与串行的差别是并行的方式可以缩短程序整体处理的时间。
消息系统: 消息系统 负责将数据从一个应用程序传送到另一个应用程序,因此应用程序可以专注于数据,但是不必担心如何共享它。分布式消息系统基于可靠的消息队列的概念,消息在客户端应用程序和消息传递系统之间的异步排队。有两种类型的消息模式可用

(1) 点对点模式(一对一,消费者主动拉取数据,消息收到后消息清除)
消息生产者生产消息发送到 Queue 中,然后消息消费者从 Queue 中取出并且消费消息。消息被消费以后,Queue 中不再有存储,所以消息消费者不可能消费到已经被消费的消息。Queue 支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费。

(2) 发布/订阅模式(一对多,消费者消费数据之后不会清除消息) 消息生产者(发布)将消息发布到 topic 中,同时有多个消息消费者(订阅) 消费该消息。和点对点方式不同,发布到 topic 的消息会被所有订阅者消费。

使用消息队列的好处:

  1. 解耦。允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。
  2. 可恢复性。系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。
  3. 缓冲。有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况。
  4. 灵活性 & 峰值处理能力。在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见。如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。举个例子: 秒杀活动当中,一般会因为流量过大,应用服务器挂掉,为了解决这个问题,一般需要在应用前端加上消息队列以控制访问流量。1、可以控制活动的人数 可以缓解短时间内流量大使得服务器崩掉。2、可以通过队列进行数据缓存,后续再进行消费处理。
  5. 异步通信。很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。

1.2 Kafka 简介

Kafka 是最初由 Linkedin 公司开发,是一个分布式、支持分区的、多副本的、多订阅者的,基于 ZooKeeper 协调的分布式日志系统,常见可以用于 web/nginx 日志、访问日志、消息服务等,Linkedin 公司于2010年将 Kafka 贡献给了 Apache基金会并成为顶级开源项目。

说明:Jay Kreps,是 Linkedin 公司的一名在线数据架构技术高管,负责 Kafka 项目。Kafka 是以一位小说家的名字命名,因为 Kafka 是 a system optimized for writing(一个用于优化写作的系统),同时 Jay Kreps 很喜欢 Kafka 的作品。

Kafka 最大的特性就是支持分布式,可以实时的处理大量数据以满足各种需求场景,比如基于 Hadoop 的批处理系统、低延迟的实时系统、storm/Spark 流式处理引擎,web/nginx日志、访问日志、消息服务等。除此以外,Kafaka 还有以下特性:

  1. 高吞吐量,高性能,低延迟:Kafka 每秒可以处理几十万条消息,它的延迟最低只有几毫秒。
  2. 高容错性,运行集群中节点失败,若副本数量为 n,则运行 n-1 个节点失败。
  3. 高持久性,高可靠性:消息被持久化到本地磁盘,并且支持数据备份,防止数据丢失。
  4. 高并发:支持数千个客户端同时读写。
  5. 有效地安全机制:Kafka 具有如下几种安全措施,① 通过SSL和SASL(Kerberos),SASL/PLAIN验证机制支持生产者、消费者与代理连接时的身份认证;② 支持代理与 ZooKeeper 连接身份验证;③ 通信时数据加密;④ 客户端读、写权限认证;⑤ Kafka 支持与外部其他认证授权服务的集成。

Kafka 的体系结构如下图所示:

Kafka 中的名词解析:

  1. 消息中间件。 通过前面的介绍,读者可能只看懂了 Kafka 是消息中间件,但中间件具体是什么,有什么作用并不知道,下面通过一个例子来介绍什么是消息中间件。Kafka 有两个最重要的核心,就是生产者与消费者。例如,生产者生产包子,消费者消费包子;生产者生产一个包子,消费者就消费一个包子。假设消费者消费包子的时候噎住了,但生产者还在生产包子,那新生产的包子就浪费了。再比如另一种情况,生产者很厉害,1分钟能生产100个包子,但消费者1分钟只能吃10个包子,一段时间之后,消费者就吃撑了,拒绝再吃,那么剩下的包子又浪费了。这个时候如果在生产者和消费者之间放个盘子,生产出的包子都放到盘子中,消费者去盘子里拿包子,这样剩下的包子就不会浪费了。那么这个盘子就是 Kafka,如下图所示:

    上面例子中出现的场景对应在 Kafka 集群的哪种情况。1、“消费者消费包子的时候噎住了”:系统宕机。2、“生产者很厉害,1分钟能生产100个包子”:数据交易量巨大。3、“消费者就吃撑了,拒绝再吃”:消息堵塞,导致系统超时。包子就相当于是数据流,系统之间的交互都是通过数据流来传输的,也叫做“消息”。
  2. 消息。 即上面例子中的包子。消息是 Kafka 通信的基本单位,由一个固定长度的消息头和一个可变长度的消息体构成。在老版本中,每一条消息称为 Message;在 java 实现的客户端中,每一条消息称之为 Record。
  3. 生产者(Producer)。 生产包子的,即将消息写入 Kafka 集群。
  4. 消费者(Consumer)。 消费生产出的包子。从 Kafka 集群中读取消息。
  5. 主题(topic)。 主题,相当于每个生产者生产出的包子都有自己的品牌,消费者可不是谁生产的包子都吃的,这样不同的生产者生产出来的包子,消费者就可以根据品牌的不同,选择性地吃了。Kafka 将一组消息抽象归纳为一个主题,也就是说,一个主题就是对消息的分类。生产者将消息发送到特定的主题,消费者订阅主题或主题的某些分区进行消费。主题有多个消费者,即一个主题可以有零个、一个或多个消费者来读取数据。
  6. 分区(Partition)。 Kafka 将一组消息归纳为一个主题,而每个主题又被分成一个或多个分区。每个分区由一系列有序、不可变消息组成,是一个有序队列。每个分区在物理上对应为一个文件夹,分区的命名规则为主题名称后接“一”连接符,之后再接分区编号,分区编号从0开始。每个主题对应的分区数可以在 Kafka 启动时加载配置文件中配置、也可以在创建主题时指定、还可以在修改主题时修改分区数。分区使得 Kafka 在并发处理上变得更加容易,理论上来说,分区数越多吞吐量越高。同时分区也是 Kafka 保证消息被顺序消费以及对消息进行负载均衡的基础。Kafka 只能保证一个分区内消息的有序性,并不能保证跨分区消息的有序性。每条消息被追加到相应的分区中,是顺序写磁盘,因此效率非常高。
  7. 副本(Replication)。 每个分区又有一个至多个副本,分区的副本分布在集群的不同代理上,以提高可用性。从存储的角度上分析,分区的每个副本在逻辑上可以抽象为一个日志(Log)对象。在创建主题时,可以指定副本数,通常设置为3,以防数据丢失。注意:分区数可以多个,可以大于 Broker 数(Kafka 集群中 Kafka 的节点数),即一个 Broker 可以包含多个分区;但分区的副本数不能多于 Broker 数。
  8. 节点(Broker)。 Kafka 集群中,一个 Kafka 节点被称为 Broker,多个 Broker 组成一个 Kafka 集群。

1.3 安装 Kafka 集群

Kafka 可以安装单机版,也可以安装集群版,即安装在集群上,本章主要介绍 Kafka 的集群安装。Kafka 是由 Scala 写成,Scala 运行在 Java 虚拟机上,并兼容现有的 Java 程序,因此部署 Kakfa 的时候,需要先部署 JDK 环境,笔者安装 Kafka 集群前的必备基础软件如下表所示:

软件名称版本
JDK1.8.0_202
Zookeeperzookeeper-3.5.8
Scala2.12.11

Linux 下 JDK 安装:https://blog.csdn.net/xw1680/article/details/115434353
Zookeeper 集群安装:https://blog.csdn.net/xw1680/article/details/118002073
Kafka 的官方下载网站为:http://kafka.apache.org/downloads,下载步骤如下:

(1) 选择适合的 Kafka 版本进行下载。Kafka 的安装包有两种,一种 xxx-src.tgz,为 Kafka 的源代码,需要自行编译安装,较为灵活;另一种 xxx.tgz,为已经编译好的,可以直接使用的安装包。本文中选择第二种,已经被编译过的安装包进行下载安装,如下图所示:

以安装文件名 “kafka_2.12-2.4.1.tgz (asc, sha512)” 为例,其中 2.12 为 Scala 的版本,2.4.1 是 Kafka 的版本。由于笔者安装的 Scala 的版本是 2.12.11,所以需要下载的是名为 “kafka_2.12-2.4.1.tgz (asc, sha512)” 的安装文件。

(2) 进行下载。确定了要下载的 kafka 版本之后,单击 “kafka_2.12-2.4.1.tgz (asc, sha512)” 链接,直接下载,如下图所示:

下载好的 Kafka 安装包名为 “kafka_2.12-2.4.1.tgz”,将该安装包复制到 bigdata01 节点的 “/data/soft” 目录下,准备安装。注意:Kafka 在启动的时候不需要安装 Scala 环境,只有在编译源码的时候才需要,因为运行的时候是在 JVM 虚拟机上运行的,只需要有 JDK 环境就可以了。

注意:在安装 Kafka 之前需要先确保 Zookeeper 集群是启动状态。分别在 bigdata01、bigdata02、bigdata03 上启动:bin/zkServer.sh start。

安装 Kafka:

  1. 集群节点规划,使用三个节点搭建一个 Kafka 集群。分别为 bigdata01、bigdata02、bigdata03。注意:针对 Kafka 集群而言,没有主从之分,所有节点都是一样的。
  2. 首先在 bigdata01 节点上配置 Kafka。解压:tar -zxvf kafka_2.12-2.4.1.tgz。修改配置文件,此时针对集群模式需要修改 broker.id、log.dirs、以及 zookeeper.connect。进入:cd kafka_2.12-2.4.1/config/、vi server.properties
    • broker.id 的值默认是从0开始的,集群中所有节点的 broker.id 从 0 开始递增即可,所以 bigdata01 节点的 broker.id 值为0。
    • log.dirs 的值建议指定到一块存储空间比较大的磁盘上面,因为在实际工作中 Kafka 中会存储很多数据,笔者虚拟机里面就一块磁盘,所以就指定到 /data 目录下面了。
    • zookeeper.connect 的值是 zookeeper 集群的地址,可以指定集群中的一个节点或者多个节点地址,多个节点地址之间使用逗号隔开即可。
  3. 将修改好配置的 Kafka 安装包拷贝到其它两个节点。 scp -rq kafka_2.12-2.4.1 bigdata02:/data/soft/、scp -rq kafka_2.12-2.4.1 bigdata03:/data/soft/。
  4. 修改 bigdata02 和 bigdata03 上 Kafka 中 broker.id 的值。修改 bigdata02 节点上的 broker.id 的值为 1、修改 bigdata03 节点上的 broker.id 的值为 2。

  5. 启动集群。分别在 bigdata01、bigdata02、bigdata03 上启动 Kafka 进程,[root@bigdata01 kafka_2.12-2.4.1]# bin/kafka-server-start.sh -daemon config/server.properties、[root@bigdata02 kafka_2.12-2.4.1]# bin/kafka-server-start.sh -daemon config/server.properties、[root@bigdata03 kafka_2.12-2.4.1]# bin/kafka-server-start.sh -daemon config/server.properties
  6. 验证。分别在 bigdata01、bigdata02、bigdata03 上执行 jps 命令验证是否有 Kafka 进程,如果都有就说明 Kafka 集群启动正常了。

二、Kafka 使用初体验

Kafka 中最重要的就是主题(topic),在搭建好 Kafka 集群后,首先要做的就是创建消息主题。主题相当于文件系统的目录,就是用来保存消息内容的实体,像目录通过目录名标识一样,主题也是通过主题名来进行标识的。例如,在 bigdata01 节点上创建一个名为 demo 的主题(指定2个分区,2个副本)。进入 Kafka 安装目录,执行如下代码:

bin/kafka-topics.sh --create --zookeeper bigdata01:2181 --partitions 2 --replication-factor 2 --topic demo

注意:副本数不能大于集群中 broker 的数量,因为每个 partition 的副本必须保存在不同的 broker,否则没有意义,如果 partition 的副本都保存在同一个 broker,那么这个 broker 挂了,则 partition 数据依然会丢失。在这里笔者使用的是3个节点的 Kafka 集群,所以副本数我就暂时设置为2,最大可以设置为3,如果读者用的是单机 Kafka 的话,这里的副本数就只能设置为1了,最后,当结果中出现 “Created topic demo.” 时,则说明创建主题成功了。

查看已创建的主题: 可以通过 list 命令来显示主题,在 bigdata01 节点的终端中输入如下命令:

bin/kafka-topics.sh --list --zookeeper localhost:2181

其实,还可以通过显示主题名的方式来验证集群,例如在 bigdata02 节点和 bigdata03 节点上分别执行 “查看 bigdata01 节点的 Kafka 主题名” 命令,如果结果显示为 demo,则说明 Kafka 集群已成功搭建。

查看指定topic的详细信息:

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic demo

查询结果如下图所示:

  1. 第一行显示指定 topic 所有 partitions 的一个总结。PartitionCount:表示这个 Topic 一共有多少个 Partition。ReplicationFactor:表示这个 topic 中 Partition 的副本因子是几。Config:这个表示创建 Topic 时动态指定的配置信息,在这没有额外指定配置信息。
  2. 下面每一行给出的是一个 Partition 的信息,如果只有一个 Partition,则只显示一行。Topic:显示当前的 topic 名称、Partition:显示当前 topic 的 partition 编号、Leader:Leader partition 所在的节点编号,这个编号其实就是 broker.id 的值,来看这个图:

    这个图里面的 hello 这个 topic 有两个 Partition,其中 Partition1 的 leader 所在的节点是 Broker1,Partition2 的 Leader 所在的节点是 Broker2、Replicas:当前 partition 所有副本所在的节点编号【包含 Leader 所在的节点】,如果设置多个副本的话,这里会显示多个,不管该节点是否是 Leader 以及是否存活。Isr:当前 partition 处于同步状态的所有节点,这里显示的所有节点都是存活状态的,并且跟 Leader 同步的(包含 Leader 所在的节点)。所以说 Replicas 和 Isr 的区别就是:如果某个 partition 的副本所在的节点宕机了,在 Replicas 中还是会显示那个节点,但是在 Isr 中就不会显示了,Isr 中显示的都是处于正常状态的节点。

修改Topic:修改 Topic 的 partition 数量,只能增加。 因为数据是存储在 partition 中的,如果可以减少 partition 的话,那么 partition 中的数据就丢了。

bin/kafka-topics.sh --alter --zookeeper localhost:2181 --partitions 5 --topic demo

修改之后再来查看一下 topic 的详细信息:

删除Topic:删除 Kafka 中的指定 Topic:

bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic demo

删除操作是不可逆的,删除 Topic 会删除它里面的所有数据,注意:Kafka 从 1.0.0 开始默认开启了删除操作,之前的版本只会把 Topic 标记为删除状态,需要设置 delete.topic.enable 为 true 才可以真正删除。如果不想开启删除功能,可以设置 delete.topic.enable 为 false,这样删除 topic 的时候只会把它标记为删除状态,此时这个 topic 依然可以正常使用。delete.topic.enable 可以配置在 server.properties 文件中。

前面我们学习了 Kafka 中的 topic 的创建方式,下面我们可以向 topic 中生产数据以及消费数据了,生产数据需要用到生产者,消费数据需要用到消费者,Kafka 默认提供了基于控制台的生产者和消费者,方便测试使用,如下:

建一个消息生产者,利用它来产生消息。在 Kafka 的安装目录下,执行如下命令:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic hello

执行结果如下图所示:

在执行上面的命令后,系统会等待用户输入信息。在创建了生产者之后,终端会一直处于生产消息的状态,用户可以一直输入消息,输入的消息会保存到消息主题当中。在创建了生产者生产了消息之后,需要在另一个终端上创建消息消费者,用来接收消息,命令如下:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic hello

发现消费不到刚才生产的数据,为什么呢?因为 kafka 的消费者默认是消费最新生产的数据,如果想消费之前生产的数据需要添加一个参数 --from-beginning,表示从头消费的意思,如下:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic hello --from-beginning

三、Kafka 核心扩展内容

3.1 Broker 扩展

Broker 的参数可以配置在 server.properties 这个配置文件中,Broker 中支持的完整参数在官方文档中有体现:

具体链接为:https://kafka.apache.org/documentation/#brokerconfigs
针对 Broker 的参数,主要分析两块:

  1. Log Flush Policy:设置数据 flush 到磁盘的时机。为了减少磁盘写入的次数,Broker 会将消息暂时缓存起来,当消息的个数达到一定阀值或者过了一定的时间间隔后,再 flush 到磁盘,这样可以减少磁盘 IO 调用的次数。这块主要通过两个参数控制:log.flush.interval.messages

    一个分区的消息数阀值,达到该阈值则将该分区的数据 flush 到磁盘,注意这里是针对分区,因为 topic 是一个逻辑概念,分区是真实存在的,每个分区会在磁盘上产生一个目录,如下图所示:

    这个参数的默认值为 9223372036854775807,long 的最大值
    默认值太大了,所以建议修改,可以使用 server.properties 中针对这个参数指定的值 10000,需要去掉注释之后这个参数才生效。

    第二个参数:log.flush.interval.ms,间隔指定时间,默认间隔指定的时间将内存中缓存的数据 flush 到磁盘中,由文档可知,这个参数的默认值为 null,此时会使用 log.flush.scheduler.interval.ms 参数的值,

    log.flush.scheduler.interval.ms 参数的值默认是 9223372036854775807,long 的最大值,所以这个值也建议修改,可以使用 server.properties 中针对这个参数指定的值1000,单位是毫秒,表示每1秒写一次磁盘,这个参数也需要去掉注释之后才生效。
  2. Log Retention Policy:设置数据保存周期,默认7天,Kafka 中的数据默认会保存7天,如果 Kafka 每天接收的数据量过大,这样是很占磁盘空间的,建议修改数据保存周期,之前在实际工作中是将数据保存周期改为了1天。数据保存周期主要通过这几个参数控制:log.retention.hours,这个参数默认值为168,单位是小时,就是7天,可以在这调整数据保存的时间,超过这个时间数据会被自动删除。

    log.retention.bytes:这个参数表示当分区的文件达到一定大小的时候会删除它,如果设置了按照指定周期删除数据文件,这个参数不设置也可以,这个参数默认是没有开启的。

    log.retention.check.interval.ms:这个参数表示检测的间隔时间,单位是毫秒,默认值是300000,就是5分钟,表示每5分钟检测一次文件看是否满足删除的时机。

3.2 Producer 扩展

Kafka 分区的原因:

  1. 方便在集群中扩展,每个 Partition 可以通过调整以适应它所在的机器,而一个 topic 又可以有多个 Partition 组成,因此整个集群就可以适应任意大小的数据了;
  2. 可以提高并发,因为可以以 Partition 为单位读写了。

Kafka 分区的原则: 我们需要将 producer 发送的数据封装成一个 ProducerRecord 对象。

  1. 指明 partition 的情况下,直接将指明的值直接作为 partiton 值;
  2. 没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition 数进行取余得到 partition 值;
  3. 既没有 partition 值又没有 key 值的情况下, kafka 采用 Sticky Partition(黏性分区器),会随机选择一个分区,并尽可能一直使用该分区,待该分区的 batch 已满或者已完成,kafka 再随机一个分区进行使用。

针对 producer 的数据通讯方式:同步发送和异步发送。 同步是指:生产者发出数据后,等接收方发回响应以后再发送下个数据的通讯方式。异步是指:生产者发出数据后,不等接收方发回响应,接着发送下个数据的通讯方式。具体的数据通讯策略是由 acks参数 控制的。

acks 默认为1,表示需要 Leader 节点 回复收到消息,这样生产者才会发送下一条数据,如果在 follower 同步成功之前 Leader 故障,那么将会丢失数据。

acks:all(-1),表示需要所有 Leader+副本节点 回复收到消息(acks=-1),这样生产者才会发送下一条数据,但是如果在 follower 同步完成后,broker 发送 acks 之前,Leader 发生故障,那么会造成数据重复。

acks:0,表示不需要任何节点回复,生产者会继续发送下一条数据。再来看一下这个图:

在向 hello 这个 topic 生产数据的时候,可以在生产者中设置 acks 参数,acks 设置为1,表示我们在向 hello 这个 topic 的 partition1 这个分区写数据的时候,只需要让 leader 所在的 Broker1 这个节点回复确认收到的消息就可以了,这样生产者就可以发送下一条数据了。如果 acks 设置为 all,则需要 partition1 的这两个副本所在的节点(包含Leader)都回复收到消息,生产者才会发送下一条数据,如果 acks 设置为0,表示生产者不会等待任何 partition 所在节点的回复,它只管发送数据,不管你有没有收到,所以这种情况丢失数据的概率比较高。

副本数据同步策略如下:

方案优点缺点
半数以上完成同步,就发送acks延迟低选举新的leader时,容忍n台节点的故障,需要2n+1个副本
全部完成同步,才发送acks选举新的leader时,容忍n台节点的故障,需要n+1个副本延迟高

Kafka 选择了第二种方案,原因如下:

  1. 同样为了容忍 n 台节点的故障,第一种方案需要 2n+1个副本,而第二种方案只需要 n+1个副本,而 Kafka 的每个分区都有大量的数据,第一种方案会造成大量数据的冗余。
  2. 虽然第二种方案的网络延迟会比较高,但网络延迟对 Kafka 的影响较小。

ISR: 采用第二种方案之后,设想以下情景:leader 收到数据,所有 follower 都开始同步数据,但有一个 follower,因为某种故障,迟迟不能与 leader 进行同步,那 leader 就要一直等下去,直到它完成同步,才能发送 acks。这个问题怎么解决呢?Leader 维护了一个动态的 in-sync replica set (ISR),意为和 leader 保持同步的 follower 集合。当 ISR 中的 follower 完成数据的同步之后,leader 就会给 producer 发送 acks。如果 follower 长时间未向 leader 同步数据,则该 follower 将被踢出 ISR,该时间阈值由 replica.lag.time.max.ms 参数设定。Leader 发生故障之后,就会从 ISR 中选举新的 leader。

Leader和Follower故障处理细节:

LEO:指的是每个副本最大的 offset;
HW:指的是消费者能见到的最大的 offset,ISR 队列中最小的 LEO。

  1. Follower 故障。Follower 发生故障后会被临时踢出 ISR,待该 Follower 恢复后,Follower 会读取本地磁盘记录的上次的 HW,并将 log 文件高于 HW 的部分截取掉,从 HW 开始向 Leader 进行同步。等该 Follower 的 LEO 大于等于该 Partition 的 HW,即 Follower 追上 Leader 之后,就可以重新加入 ISR 了。
  2. Leader 故障。Leader 发生故障之后,会从 ISR 中选出一个新的 Leader,之后,为保证多个副本之间的数据一致性,其余的 Follower 会先将各自的 log 文件高于 HW 的部分截掉,然后从新的 Leader 同步数据。注意:这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复。

Exactly Once 语义: 将服务器的 acks 级别设置为-1,可以保证 Producer 到 Server 之间不会丢失数据,即 At Least Once 语义。相对的,将服务器 acks 级别设置为0,可以保证生产者每条消息只会被发送一次,即 At Most Once 语义。At Least Once 可以保证数据不丢失,但是不能保证数据不重复;相对的,At Most Once 语义可以保证数据不重复,但是不能保证数据不丢失。但是,对于一些非常重要的信息,比如说交易数据,下游数据消费者要求数据既不重复也不丢失,即 Exactly Once 语义。在 0.11 版本以前的 Kafka,对此是无能为力的,只能保证数据不丢失,再在下游消费者对数据做全局去重。对于多个下游应用的情况,每个都需要单独做全局去重,这就对性能造成了很大影响。0.11 版本的 Kafka,引入了一项重大特性:幂等性。所谓的幂等性就是指 Producer 不论向 Server 发送多少次重复数据,Server 端都只会持久化一条。幂等性结合 At Least Once 语义,就构成了 Kafka 的 Exactly Once 语义。即:At Least Once + 幂等性 = Exactly Once 要启用幂等性,只需要将 Producer 的参数中 enable.idempotence 设置为 true 即可。Kafka 的幂等性实现其实就是将原来下游需要做的去重放在了数据上游。开启幂等性的 Producer 在初始化的时候会被分配一个PID,发往同一 Partition 的消息会附带 Sequence Number。而 Broker 端会对 <PID, Partition, SeqNumber> 做缓存,当具有相同主键的消息提交时,Broker 只会持久化一条。但是 PID 重启就会变化,同时不同的 Partition 也具有不同主键,所以幂等性无法保证跨分区跨会话的 Exactly Once。

3.3 Consumer 扩展

我们再来看看 Kafka 基础架构:

名词解释如下:

  1. Producer :消息生产者,就是向 kafka broker 发消息的客户端;
  2. Consumer :消息消费者,向 kafka broker 取消息的客户端;
  3. Consumer Group(CG):消费者组,由多个 Consumer 组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
  4. Broker :一台 kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个 broker 可以容纳多个 topic。
  5. Topic :可以理解为一个队列,生产者和消费者面向的都是一个 topic;
  6. Partition:为了实现扩展性,一个非常大的 topic 可以分布到多个 broker (即服务器)上,一个 topic 可以分为多个 partition,每个 partition 是一个有序的队列;
  7. Replication:副本,为保证集群中的某个节点发生故障时,该节点上的 partition 数据不丢失,且 kafka 仍然能够继续工作,kafka 提供了副本机制,一个 topic 的每个分区都有若干个副本,一个 leader 和若干个 follower
  8. Leader:每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是 leader。
  9. Follower:每个分区多个副本中的“从”,实时从 leader 中同步数据,保持和 leader 数据的同步。leader 发生故障时,某个 follower 会成为新的 leader。

消费方式: consumer 采用 pull(拉)模式 从 broker 中读取数据。push(推)模式 很难适应消费速率不同的消费者,因为消息发送速率是由 broker 决定的,它的目标是尽可能以最快速度传递消息,但是这样很容易造成 consumer 来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而 pull 模式则可以根据 consumer 的消费能力以适当的速率消费消息。pull 模式不足之处是,如果 Kafka 没有数据,消费者可能会陷入循环中,一直返回空数据。针对这一点,Kafka 的消费者在消费数据时会传入一个时长参数 timeout,如果当前没有数据可供消费,consumer 会等待一段时间之后再返回,这段时长即为 timeout。

分区分配策略: 一个 consumer group 中有多个 consumer,一个 topic 有多个 partition,所以必然会涉及到 partition 的分配问题,即确定那个 partition 由哪个 consumer 来消费。Kafka 有三种分配策略:RoundRobin,Range,Sticky。

分区分配策略之 RoundRobin:

分区分配策略之 Range :

总结:在同一个消费者组中,一个 partition 同时只能有一个消费者消费数据如果消费者的个数小于分区的个数,一个消费者会消费多个分区的数据。
如果消费者的个数大于分区的个数,则多余的消费者不消费数据。所以,对于一个topic,同一个消费者组中推荐不能有多于分区个数的消费者,否则将意味着某些消费者将无法获得消息。组间:多个消费者组消费相同的数据,互不影响。

offset的维护: 由于 consumer 在消费过程中可能会出现断电宕机等故障,consumer 恢复后,需要从故障前的位置的继续消费,所以 consumer 需要实时记录自己消费到了哪个 offset,以便故障恢复后继续消费。Kafka 0.9 版本之前,consumer 默认将 offset 保存在 Zookeeper 中,因为频繁操作 Zookeeper 性能不高,所以 kafka 在自己的 topic 中负责维护消费者的 offset 信息。从0.9版本开始,consumer 默认将 offset 保存在 Kafka 一个内置的 topic 中,该 topic为 __consumer_offsets。

消费者组案例:测试同一个消费者组中的消费者,同一时刻只能有一个消费者消费。

  1. 在 bigdata01 上修改 /data/soft/kafka_2.12-2.4.1/config/consumer.properties 配置文件中的 group.id 属性为任意组名。
  2. 克隆三个会话窗口,一个作为生产者,剩下两个作为消费者,分别启动:bin/kafka-console-producer.sh --topic demo --broker-list bigdata01、bin/kafka-console-consumer.sh --bootstrap-server bigdata01:9092 --topic demo --consumer.config config/consumer.properties
  3. 查看不同会话窗口中消费者的消费情况,发现同一时刻只能有一个消费者消费。

消费offset案例:

  1. 思想: __consumer_offsets 为 kafka 中的 topic, 那就可以通过消费者进行消费。
  2. 修改配置文件 consumer.properties。vi consumer.properties、添加:exclude.internal.topics=false(不排除内部的topic)
  3. 创建一个topic。bin/kafka-topics.sh --create --topic demo --zookeeper bigdata01:2181 --partitions 2 --replication-factor 2
  4. 启动生产者和消费者,分别往 demo 生产数据和消费数据。bin/kafka-console-producer.sh --topic demo --broker-list bigdata01:9092、bin/kafka-console-consumer.sh --consumer.config config/consumer.properties --topic demo --bootstrap-server bigdata01:9092
  5. 消费offset。bin/kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server bigdata01:9092 --formatter “kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter” --consumer.config config/consumer.properties --from-beginning
  6. 消费到的数据。

Consumer 消费顺序: 当一个消费者消费一个 partition 时候,消费的数据顺序和此 partition 数据的生产顺序是一致的,当一个消费者消费多个 partition 时候,消费者按照 partition 的顺序,首先消费一个 partition,当消费完一个 partition 最新的数据后再消费其它 partition 中的数据。总之:如果一个消费者消费多个 partiton,只能保证消费的数据顺序在一个 partition 内是有序的,也就是说消费 kafka 中的数据只能保证消费 partition 内的数据是有序的,多个 partition 之间是无序的。

3.4 Kafka 核心之存储和容错机制

Topic、Partition 扩展: Kafka 中消息是以 topic 进行分类的,生产者生产消息,消费者消费消息,都是面向 topic 的。topic 是逻辑上的概念,而 partition 是物理上的概念,每个 partition 对应于一个 log 文件,该 log 文件中存储的就是 Producer 生产的数据。Producer 生产的数据会被不断追加到该 log 文件末端,且每条数据都有自己的 offset(每条消息在log文件中的位置)。消费者组中的每个消费者,都会实时记录自己消费到了哪个 offset,以便出错恢复时,越多 partition 可以容纳更多的 consumer,有效提升并发消费的能力。具体什么时候增加 topic 的数量?什么时候增加 partition 的数量呢?业务类型增加需要增加 topic、数据量大需要增加 partition。

针对 Kafka 中 Topic 命名的小技巧: 建议在给 Topic 命名的时候在后面跟上 r2p10 之类的内容

r2:表示Partition的副本因子是2
p10:表示这个Topic的分区数是10

这样的好处是后期我们如果要写消费者消费指定 topic 的数据,通过 topic 的名称我们就知道应该设置多少个消费者消费数据效率最高。因为一个 partition 同时只能被一个消费者消费,所以效率最高的情况就是消费者的数量和 topic 的分区数量保持一致。在这里通过 topic 的名称就可以直接看到,一目了然。但是也有一个缺点,就是后期如果我们动态调整了 topic 的 partiton,那么这个 topic 名称上的 partition 数量就不准了,针对这个 topic,建议大家一开始的时候就提前预估一下,可以多设置一些 partition,我们在工作中的时候针对一些数据量比较大的 topic 一般会设置 40~50 个 partition,数据量少的 topic 一般设置 5~10 个 partition,这样后期调整 topic partition 数量的场景就比较少了。

由于生产者生产的消息会不断追加到 log 文件末尾,为防止 log 文件过大导致数据定位效率低下,Kafka 采取了分片和索引机制,将每个 partition 分为多个 segment。每个 segment 对应两个文件——“.index”文件和“.log”文件。 这些文件位于一个文件夹下,该文件夹的命名规则为:topic 名称+分区序号。例如,hello这个 topic 有两个分区,则其对应的文件夹为 hello-0、hello-1。

分别进入到 hello-0 以及 hello-1 目录查看,如下:

index 和 log 文件以当前 segment 的第一条消息的 offset 命名,下图为 index 文件和 log 文件的结构示意图:

这个图其实不是太准确,kafka 不会每条消息都会维护一个索引。“.index”文件存储大量的索引信息,“.log”文件存储大量的数据, 索引文件中的元数据指向对应数据文件中 message 的物理偏移地址。

Message 扩展: 每条 Message 包含了以下三个属性:

  1. offset 对应类型:long 表示此消息在一个 partition 中的起始的位置。可以认为 offset 是 partition 中 Message 的 id,自增的。
  2. MessageSize 对应类型:int32 此消息的字节大小。
  3. data 是 Message 的具体内容。

在 kafka 中每个 topic 包含1到多个 partition,每个 partition 存储一部分 Message。每条 Message 包含三个属性,其中有一个是 offset。offset 相当于 partition 中这个 message 的唯一id,那么如何通过id高效的找到message?两大法宝:分段+索引。 kafak 中数据的存储方式是这样的:每个 partition 由多个 segment【片段】组成,每个 segment 中存储多条消息,每个 partition 在内存中对应一个index,记录每个 segment 中的第一条消息偏移量。

总结:Kafka 中数据的存储流程是这样的,生产者生产的消息会被发送到 topic 的多个 partition 上,topic 收到消息后往对应 partition 的最后一个 segment 上添加该消息,segment 达到一定的大小后会创建新的 segment。

容错机制:当Kafka集群中的一个Broker节点宕机,会出现什么现象?

下面来演示一下,使用 kill -9 杀掉 bigdata01 中的 broker 进程测试:

我们可以先通过 zookeeper 来查看一下,因为当 kafka 集群中的 broker 节点启动之后,会自动向 zookeeper 中进行注册,保存当前节点信息:[root@bigdata01 hello-1]# cd /data/soft/zookeeper-3.5.8/、[root@bigdata01 zookeeper-3.5.8]# bin/zkCli.sh

此时发现 zookeeper 的 /brokers/ids 下面只有2个节点信息,可以通过get命令查看节点信息,这里面会显示对应的主机名和端口号:

然后再使用 describe 查询 topic 的详细信息,会发现此时的分区的 leader 全部变成了目前存活的另外两个节点,此时可以发现 Isr 中的内容和 Replicas 中的不一样了,因为 Isr 中显示的是目前正常运行的节点,所以当 Kafka 集群中的一个 Broker 节点宕机之后,对整个集群而言没有什么特别的大影响,此时集群会给 partition 重新选出来一些新的 Leader 节点。

当 Kafka 集群中新增一个 Broker 节点,会出现什么现象?新加入一个 broker 节点,zookeeper 会自动识别并在适当的机会选择此节点提供服务。再次启动 bigdata01 节点中的 broker 进程测试。命令:[root@bigdata01 kafka_2.12-2.4.1]# bin/kafka-server-start.sh -daemon config/server.properties,此时到 zookeeper 中查看一下:

发现 broker.id 为0的这个节点信息也有了,在通过 describe 查看 topic 的描述信息,Isr 中的信息和 Replicas 中的内容是一样的了:

但是启动后有个问题:发现新启动的这个节点不会是任何分区的 leader?怎么重新均匀分配呢?Broker 中的自动均衡策略(默认已经有):

auto.leader.rebalance.enable=true
leader.imbalance.check.interval.seconds 默认值:300

手动执行:

bin/kafka-leader-election.sh --bootstrap-server localhost:9092 --election-type preferred --all-topic-partitions

执行后的效果如下,这样就实现了均匀分配:

3.5 Kafka 高效读写数据

  1. 顺序写磁盘。Kafka 的 producer 生产数据,要写入到 log 文件中,写的过程是一直追加到文件末端,为顺序写。官网有数据表明,同样的磁盘,顺序写能到 600M/s,而随机写只有 100K/s。这与磁盘的机械机构有关,顺序写之所以快,是因为其省去了大量磁头寻址的时间。
  2. 应用 PageCache 。Kafka 数据持久化是直接持久化到 PageCache 中,这样会产生以下几个好处: I/O Scheduler 会将连续的小块写组装成大块的物理写从而提高性能、I/O Scheduler 会尝试将一些写操作重新按顺序排好,从而减少磁盘头的移动时间、充分利用所有空闲内存(非 JVM 内存)。如果使用应用层 Cache(即 JVM 堆内存),会增加 GC 负担、读操作可直接在 PageCache 内进行。如果消费和生产速度相当,甚至不需要通过物理磁盘(直接通过 PageCache)交换数据、如果进程重启,JVM 内的 Cache 会失效,但 PageCache 仍然可用,尽管持久化到 PageCache 上可能会造成宕机丢失数据的情况,但这可以被 Kafka 的 Replication 机制解决。如果为了保证这种情况下数据不丢失而强制将 PageCache 中的数据 Flush 到磁盘,反而会降低性能。
  3. 零复制技术。

3.6 Zookeeper 在 Kafka 中的作用

Kafka 集群中有一个 broker 会被选举为 Controller,负责管理集群 broker 的上下线,所有 topic 的分区副本分配和 leader 选举等工作。Controller 的管理工作都是依赖于 Zookeeper 的。以下为 partition 的 leader 选举过程:

3.7 Kafka 事务

Kafka 从 0.11 版本开始引入了事务支持。事务可以保证 Kafka 在 Exactly Once 语义的基础上,生产和消费可以跨分区和会话,要么全部成功,要么全部失败。

  1. Producer 事务。为了实现跨分区跨会话的事务,需要引入一个全局唯一的 Transaction ID,并将 Producer 获得的 PID 和 Transaction ID 绑定。这样当 Producer 重启后就可以通过正在进行的 Transaction ID 获得原来的 PID。为了管理 Transaction,Kafka 引入了一个新的组件 Transaction Coordinator。Producer 就是通过和 Transaction Coordinator 交互获得 Transaction ID 对应的任务状态。Transaction Coordinator 还负责将事务所有写入 Kafka 的一个内部 Topic,这样即使整个服务重启,由于事务状态得到保存,进行中的事务状态可以得到恢复,从而继续进行。
  2. Consumer 事务(精准一次性消费)。上述事务机制主要是从 Producer 方面考虑,对于 Consumer 而言,事务的保证就会相对较弱,尤其时无法保证 Commit 的信息被精确消费。这是由于 Consumer 可以通过 offset 访问任意信息,而且不同的 Segment File 生命周期不同,同一事务的消息可能会出现重启后被删除的情况。如果想完成 Consumer 端的精准一次性消费,那么需要 kafka 消费端将消费过程和提交 offset 过程做原子绑定。此时我们需要将 kafka 的 offset 保存到支持事务的自定义介质(比如mysql)。

kakfa从入门到放弃:kafka入门,环境搭建,命令行操作(代码片段)

文章目录一、消息队列:1.消息队列:2.消息队列中间:3.消息队列的应用场景:4.生产者,消费者模型:5.消息队列的两种模式:5.1点对点模式5.2发布订阅模式二、Kafka简介:1.什么是kafka2,应用场景:3.kafka生态圈三、kafka环境部署1.搭建集群环... 查看详情

kakfa从入门到放弃:kafka入门,环境搭建,命令行操作(代码片段)

文章目录一、消息队列:1.消息队列:2.消息队列中间:3.消息队列的应用场景:4.生产者,消费者模型:5.消息队列的两种模式:5.1点对点模式5.2发布订阅模式二、Kafka简介:1.什么是kafka2,应用场景:3.kafka生态圈三、kafka环境部署1.搭建集群环... 查看详情

大数据技术之kafkakafka概述kafka快速入门kafka架构深入(代码片段)

文章目录1Kafka概述1.1定义1.2消息队列1.2.1传统消息队列的应用场景1.2.2消息队列的两种模式1.3Kafka基础架构2Kafka快速入门2.1安装部署2.2Kafka命令行操作3Kafka架构深入3.1Kafka工作流程及文件存储机制3.2Kafka生产者3.2.1分区策略3.2.2数据... 查看详情

大数据技术之kafkakafka概述kafka快速入门kafka架构深入(代码片段)

文章目录1Kafka概述1.1定义1.2消息队列1.2.1传统消息队列的应用场景1.2.2消息队列的两种模式1.3Kafka基础架构2Kafka快速入门2.1安装部署2.2Kafka命令行操作3Kafka架构深入3.1Kafka工作流程及文件存储机制3.2Kafka生产者3.2.1分区策略3.2.2数据... 查看详情

大数据技术之kafkakafka概述kafka快速入门kafka架构深入(代码片段)

文章目录1Kafka概述1.1定义1.2消息队列1.2.1传统消息队列的应用场景1.2.2消息队列的两种模式1.3Kafka基础架构2Kafka快速入门2.1安装部署2.2Kafka命令行操作3Kafka架构深入3.1Kafka工作流程及文件存储机制3.2Kafka生产者3.2.1分区策略3.2.2数据... 查看详情

kafka从入门到进阶(代码片段)

...流平台有三个关键功能:发布和订阅流记录,类似于一个消息队列或企业消息系统以一种容错的持久方式存储记录流在流记录生成的时候就处理它们1.2 Kafka通常用于两大类应用:构建实时流数据管道,在系统或应用程序之间... 查看详情

day449.kafka概述&快速入门-kafka(代码片段)

...fka概述一、定义Kafka是一个分布式的基于发布/订阅模式的消息队列(MessageQueue),主要应用于大数据实时处理领域。二、消息队列1、传统消息队列的应用场景MQ传统应用场景之异步处理异步、削峰、解耦使用消息队列... 查看详情

消息队列-kafka01-kafka入门使用(代码片段)

一、引入kafkapom三方配置<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.4.0</version></dependency>二、生产者使用实例1.生产者 查看详情

kafka入门到精通(代码片段)

文章目录一、kafka概述?1.定义1.2消息队列1.2.1传统消息队列的使用场景1.2.2消息队列好处1.2.3消息队列两种模式1.3kafka基础架构二、kafka快速入门1.1使用docker-compose安装kafka1.2测试访问kafka-manager1.3查看kafka版本号1.4查看zookeeper版... 查看详情

消息队列之-kafka原理讲解(代码片段)

文章目录一、什么是消息队列二、消息队列的四种场景介绍2.1异步处理2.2应用解耦2.3流量削峰2.4日志处理二、什么是Kafka三、Kafka内部原理3.1Broker3.2TopicandLog3.3Partition3.4Producer3.5Consumer3.6Replicas3.7ISR3.8ConsumerPosition3.9MessageDeliverySemantic... 查看详情

数据仓库hive从入门到小牛(代码片段)

目录一、数据仓库的介绍1.1数据仓库的基本概念1.2数据仓库的主要特征1.3数据仓库与数据库区别1.4数据仓库分层架构1.5数据仓库之ETL二、Hive简介2.1什么是Hive?2.2为什么使用Hive?2.3Hive的体系结构2.4Hive与关系型数据库区别三、Hive的... 查看详情

消息队列之kafka(ha)(代码片段)

1.kafka的HA(1)zookeeper在kafka中的作用:  -Zookeeper帮助kafka集群运行:存储一些元数据,还会帮助kafka集群进行管理(选主)  -存储关于消费者消费了哪些topic到那个进度的数据。 (2)kafkaHA解决的问题: &emsp... 查看详情

kafka消息队列(代码片段)

...使用Java和Scala编写的一个快速可扩展的高吞吐量的分布式消息队列系统。kafka将数据持久化存储到磁盘上,自带分区和副本机制,因而具有较好的持久化保证。但是kafka的消息消费没有确认机制,可能因为consumer崩溃导致消息没有... 查看详情

day449.kafka概述&快速入门-kafka(代码片段)

...fka概述一、定义Kafka是一个分布式的基于发布/订阅模式的消息队列(MessageQueue),主要应用于大数据实时处理领域。二、消息队列1、传统消息队列的应用场景MQ传统应用场景之异步处理异步、削峰、解耦使用消息队列... 查看详情

分布式之消息队列复习精讲(代码片段)

本文大概围绕如下几点进行阐述:为什么使用消息队列?使用消息队列有什么缺点?消息队列如何选型?如何保证消息队列是高可用的?如何保证消息不被重复消费?如何保证消费的可靠性传输?如何保证消息的顺序性?我们围绕以上... 查看详情

大数据场景下的消息队列:kafka3.0快速入门(代码片段)

...#xff09;什么是KafkaKafka是一个分布式的基于发布/订阅模式的消息队列,同时它又是一个分布式的事件流平台。既可作为消息队列,又可作为数据管道、流分析的应用。目前Kafka的最大应用还是消息队列。市面上主流的消息队... 查看详情

大数据场景下的消息队列:kafka3.0快速入门(代码片段)

...#xff09;什么是KafkaKafka是一个分布式的基于发布/订阅模式的消息队列,同时它又是一个分布式的事件流平台。既可作为消息队列,又可作为数据管道、流分析的应用。目前Kafka的最大应用还是消息队列。市面上主流的消息队... 查看详情

大数据场景下的消息队列:kafka3.0快速入门(代码片段)

...#xff09;什么是KafkaKafka是一个分布式的基于发布/订阅模式的消息队列,同时它又是一个分布式的事件流平台。既可作为消息队列,又可作为数据管道、流分析的应用。目前Kafka的最大应用还是消息队列。市面上主流的消息队... 查看详情