kafka小笔记(代码片段)

脚丫先生 脚丫先生     2023-03-01     383

关键词:


一、kafka命令

进入到kafka目录的bin下:

1.查看所有主题

./kafka-topics.sh --list --zookeeper zk:2181

2.创建主题

./kafka-topics.sh --create --zookeeper zk:2181 --replication-factor 1 --partitions 1 --topic test

3.查看topic详情

./kafka-topics.sh --describe --zookeeper zk:2181 --topic test

示例:pandas 是基于NumPy 的一种工具,该工具是为了解决数据分析任务而创建的。

4.查看所有topic详情

./kafka-topics.sh --describe --zookeeper zk:2181 

5.删除topic

./kafka-topics.sh --delete --zookeeper zk:2181 --topic test

6.消费topic

./kafka-console-consumer.sh --bootstrap-server zk:9001 --topic DIMTopic

7.生产者topic

./kafka-console-producer.sh --broker-list zk:9001 --topic DIMTopic

二、彻底删除kafka的topic

彻底删除Kafka中的topic

1、删除kafka存储目录(server.properties文件log.dirs配置,默认为"/tmp/kafka-logs")相关topic目录

2、Kafka 删除topic的命令是:

 ./bin/kafka-topics  --delete --zookeeper 【zookeeper server】  --topic 【topic name】

 如果kafaka启动时加载的配置文件中server.properties没有配置delete.topic.enable=true,那么此时的删除并不是真正的删除,而是把topic标记为:marked for deletion

 你可以通过命令:./bin/kafka-topics --zookeeper 【zookeeper server】 --list 来查看所有topic


 此时你若想真正删除它,可以如下操作:

 (1)登录zookeeper客户端:命令:./bin/zookeeper-client

 (2)找到topic所在的目录:ls /brokers/topics

 (3)找到要删除的topic,执行命令:rmr /brokers/topics/【topic name】即可,此时topic被彻底删除。


另外被标记为marked for deletion的topic你可以在zookeeper客户端中通过命令获得:ls /admin/delete_topics/【topic name】,

如果你删除了此处的topic,那么marked for deletion 标记消失

zookeeper 的config中也有有关topic的信息: ls /config/topics/【topic name】暂时不知道有什么用

总结:

彻底删除topic:

1、删除kafka存储目录(server.properties文件log.dirs配置,默认为"/tmp/kafka-logs")相关topic目录

2、如果配置了delete.topic.enable=true直接通过命令删除,如果命令删除不掉,直接通过zookeeper-client 删除掉broker下的topic即可。

三、kafka中的acks参数

首先这个acks参数,是在KafkaProducer,也就是生产者客户端里设置的也就是说,你往kafka写数据的时候,就可以来设置这个acks参数。

这个参数实际上有三种常见的值可以设置,分别是:0、1 和 all。

  • 第一种选择是把acks参数设置为0

KafkaProducer在客户端,只要把消息发送出去,不管那条数据有没有在哪怕Partition Leader上落到磁盘,我就不管他了,直接就认为这个消息发送成功了。
如果你采用这种设置的话,那么你必须注意的一点是,可能你发送出去的消息还在半路。结果呢,Partition Leader所在Broker就直接挂了,然后结果你的客户端还认为消息发送成功了,此时就会导致这条消息就丢失了。

  • 第二种选择是设置 acks = 1

只要Partition Leader接收到消息而且写入本地磁盘了,就认为成功了,不管他其他的Follower有没有同步过去这条消息了。这种设置其实是kafka默认的设置,大家请注意,划重点!这是默认的设置也就是说,默认情况下,你要是不管acks这个参数,只要Partition Leader写成功就算成功。但是这里有一个问题,万一Partition Leader刚刚接收到消息,Follower还没来得及同步过去,结果Leader所在的broker宕机了,此时也会导致这条消息丢失,因为人家客户端已经认为发送成功了。

  • 最后一种情况,就是设置acks=all

Partition Leader接收到消息之后,还必须要求ISR列表里跟Leader保持同步的那些Follower都要把消息同步过去,才能认为这条消息是写入成功了。如果说Partition Leader刚接收到了消息,但是结果Follower没有收到消息,此时Leader宕机了,那么客户端会感知到这个消息没发送成功,他会重试再次发送消息过去。
此时可能Partition 2的Follower变成Leader了,此时ISR列表里只有最新的这个Follower转变成的Leader了,那么只要这个新的Leader接收消息就算成功了。

acks=all 就可以代表数据一定不会丢失了吗?
当然不是,如果你的Partition只有一个副本,也就是一个Leader,任何Follower都没有,你认为acks=all有用吗?
当然没用了,因为ISR里就一个Leader,他接收完消息后宕机,也会导致数据丢失。
所以说,这个acks=all,必须跟ISR列表里至少有2个以上的副本配合使用,起码是有一个Leader和一个Follower才可以。这样才能保证说写一条数据过去,一定是2个以上的副本都收到了才算是成功,此时任何一个副本宕机,不会导致数据丢失

kafka学习笔记(代码片段)

安装kafka下载下载window的kafka地址window的kafka只是为了方便学习安装地址:kafka.apache.org/安装解压zip为文件夹启动kafkakafka服务器的功能相当于RocketMQ中的broker,kafka运行还需要一个类似于命名服务器的服务。在kafka安装目录... 查看详情

kafka笔记整理(代码片段)

[TOC]Kafka笔记整理(一)Kafka简介消息队列(MessageQueue)消息Message网络中的两台计算机或者两个通讯设备之间传递的数据。例如说:文本、音乐、视频等内容。队列Queue一种特殊的线性表(数据元素首尾相接),特殊之处在于只允... 查看详情

kafka学习笔记&面经分享(代码片段)

目录定义使用消息队列的好处消息队列的两种模式点对点模式发布/订阅模式kafka基础架构kafka架构kafka工作流程kafka文件存储机制kafka生产者分区策略分区的原因分区的原则数据可靠性保证副本数据同步策略ISRack应答机制故障处理... 查看详情

kafka笔记整理:消费形式验证与性能测试(代码片段)

[TOC]Kafka笔记整理(三):消费形式验证与性能测试Kafka消费形式验证前面的《Kafka笔记整理(一)》中有提到消费者的消费形式,说明如下:1、每个consumer属于一个consumergroup,可以指定组id。group.id2、消费形式:组内:组内的消... 查看详情

学习笔记kafka——kafkaconsumerapi及开发实例(代码片段)

一、KafkaConsumerAPI1.1、Consumer1.2、KafkaConsumer1.3、ConsumerRecords1.4、ConsumerRecord1.5、KafkaConsumer实战packagedemo02;importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.apache.kafka.clien 查看详情

学习笔记kafka——kafkaconsumerapi及开发实例(代码片段)

一、KafkaConsumerAPI1.1、Consumer1.2、KafkaConsumer1.3、ConsumerRecords1.4、ConsumerRecord1.5、KafkaConsumer实战packagedemo02;importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.apache.kafka.clien 查看详情

学习笔记kafka——kafka开发环境配置及producerapi(代码片段)

一、开发环境说明1、创建Maven工程1.1、开发环境Maven&&JDK1.2、Pom配置CompilerConfiguration在pom.xml添加:<properties><maven.compiler.target>1.8</maven.compiler.target><maven.compiler.source&g 查看详情

学习笔记kafka——kafka开发环境配置及producerapi(代码片段)

一、开发环境说明1、创建Maven工程1.1、开发环境Maven&&JDK1.2、Pom配置CompilerConfiguration在pom.xml添加:<properties><maven.compiler.target>1.8</maven.compiler.target><maven.compiler.source&g 查看详情

学习笔记kafka——kafka安装配置——kafka单代理及常用操作(代码片段)

一、Linux环境准备Centos7,1CPU,2GMemory,20GDisk,VirtualSystemJDKZookeeper二、Kafka安装下载Kafka安装包官网:http://kafka.apache.org/downloads解压安装包:tar-zxvfkafka_2.11-2.3.1.tgz-C/ 查看详情

学习笔记kafka——kafka安装配置——kafka单代理及常用操作(代码片段)

一、Linux环境准备Centos7,1CPU,2GMemory,20GDisk,VirtualSystemJDKZookeeper二、Kafka安装下载Kafka安装包官网:http://kafka.apache.org/downloads解压安装包:tar-zxvfkafka_2.11-2.3.1.tgz-C/ 查看详情

kafka笔记整理:消费形式验证与性能测试(代码片段)

Kafka消费形式验证前面的《Kafka笔记整理(一)》中有提到消费者的消费形式,说明如下:1、每个consumer属于一个consumergroup,可以指定组id。group.id2、消费形式:组内:组内的消费者消费同一份数据;同时只能有一个consumer消费一... 查看详情

kafka学习笔记(代码片段)

1.官网 ​​​​​​​​​​​​​​​​​​​​​​​​​ApacheKafka2.akfX轴拆分:水平复制,就是讲单体系统多运行几个实例,做集群加负载均衡的模式,主主、主备、主从。解决单点,高可用问题Y轴拆分:基于不... 查看详情

学习笔记kafka——kafka安装配置——kafka多代理配置及常用操作(代码片段)

一、环境准备Centos7,1CPU,2GMemory,20GDisk,VirtualSystemHosts:node110,node111,node112全部配置JDK配置Zookeeper集群1.1、Linux环境准备克隆node110到node111,node112修改机器名和IP配置(有需要可以参考我这篇博 查看详情

学习笔记kafka——kafka安装配置——kafka多代理配置及常用操作(代码片段)

一、环境准备Centos7,1CPU,2GMemory,20GDisk,VirtualSystemHosts:node110,node111,node112全部配置JDK配置Zookeeper集群1.1、Linux环境准备克隆node110到node111,node112修改机器名和IP配置(有需要可以参考我这篇博 查看详情

学习笔记kafka——kafka与spark集成——原理介绍与开发环境配置实战(代码片段)

一、环境1.1、Hadoop环境1.2、Spark环境1.3、SparkStreaming1.4、AddMavenDependencies&开发流程AddScalaFrameworkSupport添加依赖(在pom.xml添加)<dependency><groupId>org.apache.spark</groupId>< 查看详情

学习笔记kafka——kafka与spark集成——原理介绍与开发环境配置实战(代码片段)

一、环境1.1、Hadoop环境1.2、Spark环境1.3、SparkStreaming1.4、AddMavenDependencies&开发流程AddScalaFrameworkSupport添加依赖(在pom.xml添加)<dependency><groupId>org.apache.spark</groupId>< 查看详情

学习笔记kafka——kafka与spark集成——原理介绍与开发环境配置实战(代码片段)

一、环境1.1、Hadoop环境1.2、Spark环境1.3、SparkStreaming1.4、AddMavenDependencies&开发流程AddScalaFrameworkSupport添加依赖(在pom.xml添加)<dependency><groupId>org.apache.spark</groupId>< 查看详情

学习笔记kafka——kafka安装配置——jdk&zookeeper安装zookeeper常用操作(代码片段)

一、Linux环境准备Centos7,1CPU,2GMemory,20GDisk,VirtualSystemHostname:node110.centos.com、node111.centos.com、node112.centos.comIPAddress:192.168.128.110Linux连接工具:Xshell、Xftp 查看详情