flume和kafka完成实时数据的采集(代码片段)

liuge36 liuge36     2023-01-13     190

关键词:

Flume和Kafka完成实时数据的采集

写在前面
Flume和Kafka在生产环境中,一般都是结合起来使用的。可以使用它们两者结合起来收集实时产生日志信息,这一点是很重要的。如果,你不了解flume和kafka,你可以先查看我写的关于那两部分的知识。再来学习,这部分的操作,也是可以的。

实时数据的采集,就面临一个问题。我们的实时数据源,怎么产生呢?因为我们可能想直接获取实时的数据流不是那么的方便。我前面写过一篇文章,关于实时数据流的python产生器,文章地址:http://blog.csdn.net/liuge36/article/details/78596876
你可以先看一下,如何生成一个实时的数据...

思路??如何开始呢??

分析:我们可以从数据的流向着手,数据一开始是在webserver的,我们的访问日志是被nginx服务器实时收集到了指定的文件,我们就是从这个文件中把日志数据收集起来,即:webserver=>flume=>kafka

webserver日志存放文件位置
这个文件的位置,一般是我们自己设置的

我们的web日志存放的目录是在:
/home/hadoop/data/project/logs/access.log下面

[[email protected] logs]$ pwd
/home/hadoop/data/project/logs
[[email protected] logs]$ ls
access.log
[[email protected] logs]$ 

Flume

做flume,其实就是写conf文件,就面临选型的问题
source选型?channel选型?sink选型?

这里我们选择 exec source memory channel kafka sink

怎么写呢?
按照之前说的那样1234步骤

从官网中,我们可以找到我们的选型应该如何书写:
1) 配置Source
exec source

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/hadoop/data/project/logs/access.log
a1.sources.r1.shell = /bin/sh -c

2) 配置Channel
memory channel

a1.channels.c1.type = memory

3) 配置Sink
kafka sink
flume1.6版本可以参照http://archive.cloudera.com/cdh5/cdh/5/flume-ng-1.6.0-cdh5.7.0/FlumeUserGuide.html#kafka-sink

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.brokerList = hadoop000:9092
a1.sinks.k1.topic = flume_kafka
a1.sinks.k1.batchSize = 5
a1.sinks.k1.requiredAcks =1

4) 把以上三个组件串起来

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

我们new一个文件叫做test3.conf
把我们自己分析的代码贴进去:

[[email protected] conf]$ vim test3.conf 
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/hadoop/data/project/logs/access.log
a1.sources.r1.shell = /bin/sh -c


a1.channels.c1.type = memory


a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.brokerList = hadoop000:9092
a1.sinks.k1.topic = flume_kafka
a1.sinks.k1.batchSize = 5
a1.sinks.k1.requiredAcks =1

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

这里我们先不启动,因为其中涉及到kafka的东西,必须先把kafka部署起来,,

Kafka的部署

kafka如何部署呢??
参照官网的说法,我们首先启动一个zookeeper进程,接着,才能够启动kafka的server

Step 1: Start the zookeeper

[[email protected] ~]$ 
[[email protected] ~]$ jps
29147 Jps
[[email protected] ~]$ zkServer.sh start
JMX enabled by default
Using config: /home/hadoop/app/zk/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[[email protected] ~]$ jps
29172 QuorumPeerMain
29189 Jps
[[email protected] ~]$ 

Step 2: Start the server

[[email protected] ~]$ kafka-server-start.sh $KAFKA_HOME/config/server.properties
#外开一个窗口,查看jps
[[email protected] ~]$ jps
29330 Jps
29172 QuorumPeerMain
29229 Kafka
[[email protected] ~]$ 

如果,这部分不是很熟悉,可以参考http://blog.csdn.net/liuge36/article/details/78592169

Step 3: Create a topic

[[email protected] ~]$ kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic flume_kafka
WARNING: Due to limitations in metric names, topics with a period (‘.‘) or underscore (‘_‘) could collide. To avoid issues it is best to use either, but not both.
Created topic "flume_kafka".
[[email protected] ~]$ 

Step 4: 开启之前的agent

  [[email protected] conf]$ flume-ng agent --name a1 --conf . --conf-file ./test3.conf -Dflume.root.logger=INFO,console

Step 5: Start a consumer

kafka-console-consumer.sh --zookeeper hadoop000:2181 –topic flume-kafka

上面的第五步执行之后,就会收到刷屏的结果,哈哈哈!!
技术分享图片

上面的消费者会一直一直的刷屏,还是很有意思的!!!
这里的消费者是把接收到的数据数据到屏幕上

后面,我们会介绍,使用SparkStreaming作为消费者实时接收数据,并且接收到的数据做简单数据清洗的开发,从随机产生的日志中筛选出我们需要的数据.....
















flume整合kafka完成实时数据采集(代码片段)

agent选择agent1 execsource+memorychannel+avrosinkagent2 avrosource+memorychannel 模拟实际工作中的场景,agent1为A机器,agent2为B机器。 avrosource:监听avro端口,并且接收来自外部avro信息,avrosink:一般用于跨节点传输,主要绑定数... 查看详情

python爬虫等获取实时数据+flume+kafka+sparkstreaming+mysql+echarts实现数据动态实时采集分析展示(代码片段)

使用爬虫等获取实时数据+Flume+Kafka+SparkStreaming+mysql+Echarts实现数据动态实时采集、分析、展示主要工作流程如下所示:其中爬虫获取实时数据,并把数据实时传输到Linux本地文件夹中。使用Flume实时监控该文件夹,如果发现文件内... 查看详情

flume整合数据到kafka,sparkstreaming消费数据,并存储到hbase和redis中(代码片段)

...到hbase,并且将海口数据保存到redis当中去,实现实时轨迹监控以及历史轨迹回放的功能 为了模拟数据的实时生成,我们可以通过数据回放程序来实现订单数据的回放功能1、模拟数据生成1、海口订单数据上传     ... 查看详情

python爬虫等获取实时数据+flume+kafka+sparkstreaming+mysql+echarts实现数据动态实时采集分析展示

使用爬虫等获取实时数据+Flume+Kafka+SparkStreaming+mysql+Echarts实现数据动态实时采集、分析、展示主要工作流程如下所示:其中爬虫获取实时数据,并把数据实时传输到Linux本地文件夹中。使用Flume实时监控该文件夹,如果发现文件内... 查看详情

转:大数据架构:flume-ng+kafka+storm+hdfs实时系统组合

...储”<ignore_js_op> 1).数据采集负责从各节点上实时采集数据,选用cloudera的flume来实现2).数据接入由于采集数据的速度和数据处理的速度不一定同步,因此添加一个消息中间件来作为缓冲,选用apache的kafka3). 查看详情

flume+kafka+storm+redis大数据在线实时分析(代码片段)

1、实时处理框架即从上面的架构中我们可以看出,其由下面的几部分构成:Flume集群Kafka集群Storm集群从构建实时处理系统的角度出发,我们需要做的是,如何让数据在各个不同的集群系统之间打通(从上面的图示中也能很好地... 查看详情

flume+kafka+storm+redis实时分析系统基本架构(代码片段)

今天作者要在这里通过一个简单的电商网站订单实时分析系统和大家一起梳理一下大数据环境下的实时分析系统的架构模型。当然这个架构模型只是实时分析技术的一个简单的入门级架构,实际生产环境中的大数据实时分析... 查看详情

flume整合数据到kafka,sparkstreaming消费数据,并存储到hbase和redis中(代码片段)

目录1、模拟数据生成2、flume采集数据 1、node01配置flume的conf文件 2、node02开发flume的配置文件3、node03开发flume的配置文件4、开发flume启动停止脚本 5、node01执行以下命令创建kafka的topic6、启动并查看kafka的数据3、SparkStreaming消费ka... 查看详情

中国移动实时数据分析-基于spark+kafka+flume(代码片段)

  这两天主要是做了中国移动的实时数据分析一个小项目(可以说是demo了),这里记录下来整个过程里面遇到的坑,首先安装好flume,kafka,spark(基于代码本地运行可以不安装),redis,zookeeper主要是为了熟悉一下整个的一个... 查看详情

日志采集系统flume和kafka有啥区别及联系,它们分别在啥时候

...flume负责生产数据,发送至kafka。sparkstreaming作为消费者,实时的从kafka中获取数据进行计算。计算结果保存至redis,供实时推荐使用。flume+kafka+spark+redis是实时数据收集与计算的一套经典架构... 参考技术Bkafka是分布式消息队列或... 查看详情

日志采集flume配置(代码片段)

Flume直接读log日志的数据,log日志的格式是app.yyyy-mm-dd.logFlume的具体配置在/opt/module/flume/conf目录下创建file-flume-kafka.conf文件[hadoop@hadoop102conf]$vimfile-flume-kafka.conf在文件配置如下内容#为各组件命名a1.sources=r1a1.c 查看详情

日志采集flume配置(代码片段)

Flume直接读log日志的数据,log日志的格式是app.yyyy-mm-dd.logFlume的具体配置在/opt/module/flume/conf目录下创建file-flume-kafka.conf文件[hadoop@hadoop102conf]$vimfile-flume-kafka.conf在文件配置如下内容#为各组件命名a1.sources=r1a1.c 查看详情

flume理论

...式的数据采集flume使用简单,仅需写一个配置文件,即可完成一个分布式,高可用,可靠的实时数据流采集任务。重要概念:Flume运行的核心是Agent.Flume以agent为最小的独立运行单位。一个agent包含三个核心组件,source,channel,sink.这... 查看详情

flume+kafka+storm+redis实时分析系统基本架构(代码片段)

今天作者要在这里通过一个简单的电商网站订单实时分析系统和大家一起梳理一下大数据环境下的实时分析系统的架构模型。当然这个架构模型只是实时分析技术的一个简单的入门级架构,实际生产环境中的大数据实时分析... 查看详情

flume整合kafka(代码片段)

一、需求利用flume采集Linux下的文件信息,并且传入到kafka集群当中。环境准备zookeeper集群和kafka集群安装好。二、配置flume官网下载flume。博主自己这里使用的是flume1.6.0。官网地址http://flume.apache.org/download.html解压缩。tar-zxvfapache-f... 查看详情

flume(代码片段)

...数据全部成功提交到channel,那么source才认为该数据读取完成。同理,只有成功被sink写出去的数据,才会从channel中移除;失败后就重新提交;组成:Agent由source+channel+sink构成;source是数据来源的抽象,sink是数据去向的抽象;Sourc... 查看详情

大数据高级开发工程师——数据采集框架flume(代码片段)

文章目录数据采集框架FlumeFlume基本介绍概述运行机制Flume采集系统结构图1.简单结构2.复杂结构Flume实战案例采集网络端口数据1.Flume的安装部署2.开发配置文件3.启动4.使用telnet测试采集目录到HDFS1.需求分析2.开发配置文件3.启动&... 查看详情

kafka怎么收集到flume的日志

...需要任何开发的系统,请使用Flume。Flume可以使用拦截器实时处理数据。这些对数据屏蔽或者过量是很有用的。Kafka需要外部的流处理系统才能做到。Kafka和Flume都是可靠的系统,通过适当的配置能保证零数据丢失。然而,Flume不支... 查看详情