kafka初步学习(代码片段)

KINGHEY KINGHEY     2022-10-24     542

关键词:

一、官网教程案例学习
 
Kafka — 分布式消息队列
 
消息系统
消息中间件:缓冲于生产与消费中间
缓冲满了,可以进行Kafka的扩容
 
特性:
水平扩展性、容错性、实时、快
 
 
Kafka架构:
技术分享图片
 
 
理解producer、consumer、broker(缓冲区)、topic(标签)
 
 一个配置文件(server.properties)相当于一个broker
 
 
单节点(一台机器)的Kafka部署方法:
 
开启的时候记得创建多个控制台,方便分别在上面同时启动server(broker)、producer、consumer
 
1. 单broker部署:
 
准备工作:
先安装zookeeper,解压完后只需要更改conf目录下的zoo.cfg,改变dataDir不保存在tmp目录
ZK简单的使用,bin目录下的zkServer启动服务器,然后通过zkCli来连接
 
配置Kafka:
config目录下:
server.properties:
broker.id
listeners
host.name
 
启动:在KAFKA_HOME下
先启动ZK server
zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties
再启动kafka server,启动时要加上config配置文件
kafka-server-start.sh $KAFKA_HOME/config/server.properties
 
创建topic:指定zookeeper端口
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
查看topic
kafka-topics.sh --list --zookeeper localhost:2181
查看topic详细信息
describe命令,可查看活的broker有哪个,leader是哪个等
 
发送消息(生产):指定broker
kafka-console-producer.sh --broker-list localhost:9092 --topic test
 
注意:其中2181端口对应zookeeper server,而9092对应listener broker
 
消费消息:指定zk
kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
 
注意:带有beginning参数的话,会把历史所有的都一起读取
 
 
2. 多broker部署:
 
复制多个server-properties
更改其中的broker.id  listeners   log.dir
 
启动多个kafka server:
kafka-server-start.sh -daemon $KAFKA_HOME/config/server-1.properties &
kafka-server-start.sh -daemon $KAFKA_HOME/config/server-2.properties &
kafka-server-start.sh -daemon $KAFKA_HOME/config/server-3.properties
 
-daemon在后台运行
&代表还有下几行
启动成功后jps中有三个kafka
 
创建多副本topic:
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-repli
 
发送和单broker一样,只不过改成多个端口
 
 多broker的容错机制:
如果leader broker干掉了,就会选举新的,也就是干掉任意哪种broker都不会影响全局的使用
 
 
 
 
二、IDEA+Maven环境开发:
 
配置环境:
 
创建scala模版:
技术分享图片
 
填信息:
技术分享图片
 
修改setting路径:
技术分享图片
 
创建完成scala project
修改pom.xml文件:
添加与删除dependency
kafka的版本:
<kafka.version>0.9.0.0</kafka.version>
  <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>$kafka.version</version>
  </dependency>
</dependencies>
 
 创建Java文件夹,并把它改成source属性(蓝色),在IDEA右上角改
 
 
 三、用Java API来完成Kafka的Producer和Consumer的编程:
 
 
Producer:
 
首先定义Kafka中的常用变量类,brokerlist、ZK端口、topic名称
/*
* Kafka配置文件, 用于定义producer, consumer
* */
public class KafkaProperties 
 
    //定义端口号
    public static final String ZK = "localhost:2181";
    public static final String TOPIC = "hello_topic";
    public static final String BROKER_LIST = "localhost:9092";

 

 
然后创建producer:
  1. 定义全局变量topic,producer(选择kafka.javaapi.producer包)
  2. 写构造函数,包括了:
  3. 外部传入topic
  4. 创建producer,需要传入ProducerConfig对象
  5. PC对象需要传入一些参数,用properties类(java.util包)来传入
  6. properties对象中需要为PC对象设置”metadata.broker.list" “serializer.class" "request.required.acks"
 
最后通过Thread线程run方法来启动producer发送信息
(本测试实现的每隔2s发送一个message)
 
实现代码:
 
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
 
import java.util.Properties;
 
/*
* Kafka生产者
* */
public class KafkaProducer extends Thread
 
    private String topic;
    //选择kafka.javaapi.producer
    private Producer<Integer, String> producer;
 
    //构造方法,传入topic,生成producer
    public KafkaProducer(String topic) 
 
        this.topic = topic;
 
        //用properties设置ProducerConfig所需要的参数, 这是生成Producer的前提
        //分别是broker_list, 序列化, 握手机制
        Properties properties = new Properties();
        properties.put("metadata.broker.list", KafkaProperties.BROKER_LIST);
        properties.put("serializer.class", "kafka.serializer.StringEncoder");  //此处序列化类用String
        properties.put("request.required.acks", "1");  //可设置为0, 1, -1, 一般生产用1, 最严谨是-1, 不能用0
 
        producer = new Producer<Integer, String>(new ProducerConfig(properties));
    
 
    //用线程来启动producer
    @Override
    public void run() 
 
        int messageNo = 1;
 
        while(true) 
            String message = "massage_" + messageNo;
            producer.send(new KeyedMessage<Integer, String>(topic, message));
            System.out.println("send: " + message);
 
            messageNo++;
 
            //2s间隔发送一次
            try 
                Thread.sleep(2000);
             catch(Exception e) 
                e.printStackTrace();
            
        
    

 

 
 
Consumer:
 
创建过程:
  1. 构造方法中传入topic
  2. 创建createConnector方法,返回值是一个ConsumerConnector,注意不直接是Consumer
  3. 按照producer一样的方法,往ConsumerConnector中传入所需要的属性zookeeper.connect group.id
 
执行过程:通过Thread的run方法改写:
  1. 为了创建messageStream,先创建一个Map,装topic和kafka stream的数量
  2. 创建messageStream,并获取每次的数据
  3. 对messageStream进行迭代,获取消息
 
实现代码:
 
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
 
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
 
import static com.sun.org.apache.xml.internal.security.keys.keyresolver.KeyResolver.iterator;
import static javafx.scene.input.KeyCode.V;
 
/*
* Kafka消费者
* */
public class KafkaConsumer extends Thread 
 
    private String topic;
 
    public KafkaConsumer(String topic) 
 
        this.topic = topic;
    
 
    //ConsumerConnector选择kafka.javaapi.consumer包
    //此处是要创建consumer连接器, 而不是创建consumer, 区别于producer
    private ConsumerConnector createConnector() 
 
        //同样地设置ConsumerConfig对象的属性
        //需要设置ZK
        Properties properties = new Properties();
        properties.put("zookeeper.connect", KafkaProperties.ZK);
        properties.put("group.id", KafkaProperties.GROUP_ID);
        return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
    
 
 
    //线程启动consumer
    @Override
    public void run() 
 
        ConsumerConnector consumer = createConnector();
 
        //由于createMessageStreams需要传入一个Map, 所以创建一个
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        //map中放入topic和kafka stream的数量
        topicCountMap.put(topic, 1);
 
        //创建messageStream, 从源码中可看出它的数据类型
        //String是topic, List是数据比特流
        Map<String, List<KafkaStream<byte[], byte[]>>> messageStream = consumer.createMessageStreams(topicCountMap);
        //获取每次的数据
        KafkaStream<byte[], byte[]> byteStream = messageStream.get(topic).get(0);
 
        //数据流进行迭代
        ConsumerIterator<byte[], byte[]> iterator = byteStream.iterator();
 
        while (iterator.hasNext()) 
 
            //由于iterator里面的是byte类型,要转为String
            String message = new String(iterator.next().message());
            System.out.println("receive:" + message);
        
    

 

 
四、Kafka简易实战
 
整合Flume和Kafka完成实时数据采集
 
Kafka sink作为producer连接起来
 
技术选型:
Agent1: exec source -> memory channel -> avro sink
Agent2: avro source -> memory channel -> kafka sink(producer)
producer -> consumer
 
 
配置exec-memory-avro:
 
exec-memory-avro.sources = exec-source
exec-memory-avro.sinks = avro-sink
exec-memory-avro.channels = memory-channel
 
# Describe/configure the source
exec-memory-avro.sources.exec-source.type = exec
exec-memory-avro.sources.exec-source.command = tail -F /usr/local/mycode/data/data.log
exec-memory-avro.sources.exec-source.shell = /bin/sh -c
 
# Describe the sink
exec-memory-avro.sinks.avro-sink.type = avro
exec-memory-avro.sinks.avro-sink.hostname = localhost
exec-memory-avro.sinks.avro-sink.port = 44444
 
# Use a channel which buffers events in memory
exec-memory-avro.channels.memory-channel.type = memory
 
# Bind the source and sink to the channel
exec-memory-avro.sources.exec-source.channels = memory-channel
exec-memory-avro.sinks.avro-sink.channel = memory-channel
 
 
 
配置avro-memory-kafka:
 
avro-memory-kafka.sources = avro-source
avro-memory-kafka.sinks = kafka-sink
avro-memory-kafka.channels = memory-channel
 
# Describe/configure the source
avro-memory-kafka.sources.avro-source.type = avro
avro-memory-kafka.sources.avro-source.bind = localhost
avro-memory-kafka.sources.avro-source.port = 44444
 
# Describe the sink
avro-memory-kafka.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
avro-memory-kafka.sinks.kafka-sink.kafka.bootstrap.servers = localhost:9092
avro-memory-kafka.sinks.kafka-sink.kafka.topic = hello_topic
avro-memory-kafka.sinks.kafka-sink.kafka.flumeBatchSize = 5
avro-memory-kafka.sinks.kafka-sink.kafka.kafka.producer.acks = 1
 
# Use a channel which buffers events in memory
avro-memory-kafka.channels.memory-channel.type = memory
 
# Bind the source and sink to the channel
avro-memory-kafka.sources.avro-source.channels = memory-channel
avro-memory-kafka.sinks.kafka-sink.channel = memory-channel
 
 
 
启动两个flume agent:(注意先后顺序)
 
flume-ng agent --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/avro-memory-kafka.conf --name avro-memory-kafka -Dflume.root.logger=INFO,console
 
flume-ng agent --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/exec-memory-avro.conf --name exec-memory-avro -Dflume.root.logger=INFO,console
 
 
 
启动kafka consumer:
 
kafka-console-consumer.sh --zookeeper localhost:2181 --topic hello_topic
 
执行过程比较慢!要等一下 concumer的控制台才有数据显示
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

对countdownlatch的初步学习(代码片段)

CountDownLatch的中文翻译为"闭锁",在JDK1.5中CountDownLatch类加入进来。为程序猿进行并发编程提供有利的帮助。 首先我们先看看JDK文档中对于CountDownLatch类的介绍:Asynchronizationaidthatallowsoneormorethreadstowaituntilasetofoperationsbeingperforme... 查看详情

echarts初步学习(代码片段)

官方教程学习: 转自官网http://echarts.baidu.com/ ECharts3开始不再强制使用AMD的方式按需引入,代码里也不再内置AMD加载器。因此引入方式简单了很多,只需要像普通的JavaScript库一样用script标签引入。<!DOCTYPEhtml><html>... 查看详情

jsmap()初步学习(代码片段)

//array.map(callback,thisObject?),callback需要有return值//map:‘映射‘被映射成新的数组 eg1:letdata=[3,4,2];letdata2=data.map(item=>returnitem*5)console.log(data2)  //eg2:letusers=[name:"天海佑希","em 查看详情

kafka学习之路kafka的安装(代码片段)

zookeeper1:192.168.1.11zookeeper2:192.168.1.12zookeeper3:192.168.1.13kafka1:192.168.1.14kafka2:192.168.1.15kafka3:192.168.1.16kafka3:192.168.1.17kafka-manager:192.168.1.18一、下载下载地址:http://kafka.apache. 查看详情

kafka学习笔记(代码片段)

安装kafka下载下载window的kafka地址window的kafka只是为了方便学习安装地址:kafka.apache.org/安装解压zip为文件夹启动kafkakafka服务器的功能相当于RocketMQ中的broker,kafka运行还需要一个类似于命名服务器的服务。在kafka安装目录... 查看详情

kafka学习之路kafka的架构(代码片段)

一、Kafka的集群架构如上图所示,一个典型的Kafka集群中包含若干Producer(可以是web前端产生的PageView,或者是服务器日志,系统CPU、Memory等),若干broker(Kafka支持水平扩展,一般broker数量越多,集群吞吐率越高),若干ConsumerGro... 查看详情

kafka集群搭建(代码片段)

...或复制Kafka安装包2解压缩Kafka安装包3配置Kafka集群4Kafka的初步应用4.1创建主题4.2发送消息4.3消费消息1下载或复制Kafka安装包http://archive.apache.org/dist/kafka/0.10.1.0/2解压缩Kafka安装包tar-zxvfkafka_2.11-0.10.1.0.gz3配置Kafka集群配置Kafka集群时... 查看详情

3.kafka学习(代码片段)

kafka是什么Kafka最初由Linkedin公司开发,是一个分布式、分区的、多副本的、多订阅者,基于zookeeper协调的,发布/订阅模式的消息队列(MessageQueue),Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。经过多年发展,Kafka已... 查看详情

centos初步学习fromlanguagec(代码片段)

一、Start1、编译器安装yuminstallgcc-y#c编译器yuminstallgcc-c++-y#c++编译器2、第一个C程序//文件名:txl.c#include<stdio.h>//标准输出输入intmain()printf(“hello”);return0;3、编译gcctxl.c-otxl-o:编译并输出一个可执行文件  查看详情

消息系统概述(代码片段)

草捏子最近开始学习Kafka。在学习前,给自己确定了下学习的范围,大致如下:理解Kafka的相关概念;掌握Kafka的基本API使用;了解Kafka的背后原理。后续将在这学习范围内输出一些相关文章。那么本文作为Kafka系列的第一篇文章... 查看详情

storm初步(代码片段)

初入Storm前言学习Storm已经有两周左右的时间,但是认真来说学习过程确实是零零散散,遇到问题去百度一下,找到新概念再次学习,在这样的一个循环又不成体系的过程中不断学习Storm。前人栽树,后人乘凉,也正是因为网上有... 查看详情

docker了解及初步学习(代码片段)

  部署项目是一件很麻烦的事,尤其是当同一个项目部署到很多服务器的时候,这种麻烦变得更加让人难受。  在我刚到公司的时候,公司之前的先项目采用的就是使用Web服务器Tomcat部署项目,但是后来的新项目是微服务的... 查看详情

kafka学习入门与原理(代码片段)

Kafka学习(一)入门与原理初识KafkaKafka起初是由LinkedIn公司采用Scala语言开发的一个多分区、多副本且基于ZooKeeper协调的分布式消息系统,现已被捐献给Apache基金会。基本概念一个典型的Kafka体系架构包括若干Producer、若干Broker、... 查看详情

日志打入kafka改造历程-我们到底能走多远系列49(代码片段)

...。另一方面,应用在服务器上会打日志文件。如图:详细初步实现首先,我们来初步实现这个方案,搭建elk略去不谈,其中特 查看详情

kafka学习之路kafka在zookeeper中的存储(代码片段)

当kafka启动的时候,就会向zookeeper里面注册一些信息,这些数据也称为Kafka的元数据信息。一、Kafka在zookeeper中存储结构图二、分析根目录下的结构服务端开启的情况下,进入客户端的命令:zookeeper目录/bin/zkCli.sh#zookeeper目录/bin/zk... 查看详情

kafka学习笔记&面经分享(代码片段)

目录定义使用消息队列的好处消息队列的两种模式点对点模式发布/订阅模式kafka基础架构kafka架构kafka工作流程kafka文件存储机制kafka生产者分区策略分区的原因分区的原则数据可靠性保证副本数据同步策略ISRack应答机制故障处理... 查看详情

机器学习sklearn的k近邻算法api初步使用(代码片段)

目录1k近邻算法api初步使用2Scikit-learn工具介绍2.1安装2.2Scikit-learn包含的内容3K-近邻算法API4案例4.1步骤分析4.2代码过程5小结1k近邻算法api初步使用K近邻算法介绍:https://blog.csdn.net/ZGL_cyy/article/details/125583129机器学习流程复习... 查看详情

vue学习之路-1.初步感知(代码片段)

 一、安装    这里使用node的npm包管理工具进行操作。操作前请先下载node。  在工程文件夹中使用以下命令安装vue:npminstallvue  如下图所示:我在helloworld 文件夹中安装vue:    // Vue不支持IE8及其以... 查看详情