flume数据采集之常见集群配置案例(代码片段)

author author     2022-11-01     537

关键词:

[TOC]

非集群配置

这种情况非集群配置方式,比较简单,可以直接参考我整理的《Flume笔记整理》,其基本结构图如下:

技术分享图片

Flume集群之多个Agent一个source

结构说明

结构图如下:

技术分享图片

说明如下:

即可以把我们的Agent部署在不同的节点上,上面是两个Agent的情况。其中Agent foo可以部署在日志产生的节点上,
比如,可以是我们web服务器例如tomcat或者nginx的节点上,foo的source可以配置为监控日志文件数据的变化,
channel则可以基于内存或基于文件进行存储,而sink即日志落地可以配置为avro,即输出到下一个Agent中。

Agent bar可以部署在另一个节点上,当然跟foo在同一个节点也是没有问题,因为本身Flume是可以多个实例在同一个
节点上运行的。bar主要作用是收集来自不同avro source的节点的日志数据,实际上,如果我们的web环境是集群的,
那么web服务器就会有多个节点,这时就有多个web服务器节点产生日志,我们需要在这多个web服务器上都部署agent,
此时,bar的source就会有多个,后面的案例正是如此,不过在这个小节中,只讨论多个agent一个source的情况。
而对于agent bar的数据下沉方式,也是可以选择多种方式,详细可以参考官网文档,这里选择sink为HDFS。

不过需要注意的是,在agent foo中,source只有一个,在后面的案例中,会配置多个source,即在这一个agent中,
可以采集不同的日志文件,后面要讨论的多个source,指的是多个不同日志文件的来源,即foo中的多个source,例如
data-access.log、data-ugctail.log、data-ugchead.log等等。

配置案例

环境说明

如下:

技术分享图片

即这里有两个节点:

uplooking01:
其中的日志文件 /home/uplooking/data/data-clean/data-access.log
为web服务器生成的用户访问日志,并且每天会产生一个新的日志文件。
在这个节点上,我们需要部署一个Flume的Agent,其source为该日志文件,sink为avro。

uplooking03:
这个节点的作用主要是收集来自不同Flume Agent的日志输出数据,例如上面的agent,然后输出到HDFS中。

说明:在我的环境中,有uplooking01 uplooking02 uplooking03三个节点,并且三个节点配置了Hadoop集群。

配置

  • uplooking01
#########################################################
##
##主要作用是监听文件中的新增数据,采集到数据之后,输出到avro
##    注意:Flume agent的运行,主要就是配置source channel sink
##  下面的a1就是agent的代号,source叫r1 channel叫c1 sink叫k1
#########################################################
a1.sources = r1
a1.sinks = k1
a1.channels = c1

#对于source的配置描述 监听文件中的新增数据 exec
a1.sources.r1.type = exec
a1.sources.r1.command  = tail -F /home/uplooking/data/data-clean/data-access.log

#对于sink的配置描述 使用avro日志做数据的消费
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = uplooking03
a1.sinks.k1.port = 44444

#对于channel的配置描述 使用文件做数据的临时缓存 这种的安全性要高
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /home/uplooking/data/flume/checkpoint
a1.channels.c1.dataDirs = /home/uplooking/data/flume/data

#通过channel c1将source r1和sink k1关联起来
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
  • uplooking03
#########################################################
##
##主要作用是监听avro,采集到数据之后,输出到hdfs
##    注意:Flume agent的运行,主要就是配置source channel sink
##  下面的a1就是agent的代号,source叫r1 channel叫c1 sink叫k1
#########################################################
a1.sources = r1
a1.sinks = k1
a1.channels = c1

#对于source的配置描述 监听avro
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 44444

#对于sink的配置描述 使用log日志做数据的消费
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /input/data-clean/access/%y/%m/%d
a1.sinks.k1.hdfs.filePrefix = flume
a1.sinks.k1.hdfs.fileSuffix = .log
a1.sinks.k1.hdfs.inUsePrefix = tmpFlume
a1.sinks.k1.hdfs.inUseSuffix = .tmp
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = second
#配置下面两项后,保存到HDFS中的数据才是文本
#否则通过hdfs dfs -text查看时,显示的是经过压缩的16进制
a1.sinks.k1.hdfs.serializer = TEXT
a1.sinks.k1.hdfs.fileType = DataStream

#对于channel的配置描述 使用内存缓冲区域做数据的临时缓存
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

#通过channel c1将source r1和sink k1关联起来
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

测试

首先要确保会有日志生成,其输出为/home/uplooking/data/data-clean/data-access.log

在uplooking03上启动Flume Agent:

[[email protected] flume]$ flume-ng agent -n a1 -c conf --conf-file conf/flume-source-avro.conf -Dflume.root.logger=INFO,console

在uplooking01上启动Flume Agent:

flume-ng agent -n a1 -c conf --conf-file conf/flume-sink-avro.conf -Dflume.root.logger=INFO,console

一段时间后,便可以在hdfs中看到写入的日志文件:

[[email protected] ~]$ hdfs dfs -ls /input/data-clean/access/18/04/07
18/04/07 08:52:02 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 26 items
-rw-r--r--   3 uplooking supergroup       1131 2018-04-07 08:50 /input/data-clean/access/18/04/07/flume.1523062248369.log
-rw-r--r--   3 uplooking supergroup       1183 2018-04-07 08:50 /input/data-clean/access/18/04/07/flume.1523062248370.log
-rw-r--r--   3 uplooking supergroup       1176 2018-04-07 08:50 /input/data-clean/access/18/04/07/flume.1523062248371.log
......

查看文件中的数据:

[[email protected] ~]$ hdfs dfs -text /input/data-clean/access/18/04/07/flume.1523062248369.log
18/04/07 08:55:07 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
1000    220.194.55.244  null    40604   0       POST /check/init HTTP/1.1       500     null    Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/59.0.3071.115 Safari/537.3      1523062236368
1002    221.8.9.6 80    886a1533-38ca-466c-86e1-0b84022f781b    20201   1       GET /top HTTP/1.0       500     null      Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/59.0.3071.115 Safari/537.3      1523062236869
1002    61.172.249.96   99fb19c4-ec59-4abd-899c-4059dea39ead    0       0       POST /updateById?id=21 HTTP/1.1 408       null    Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7.0; rv:11.0) like Gecko    1523062237370
1003    61.172.249.96   886a1533-38ca-466c-86e1-0b84022f781b    10022   1       GET /tologin HTTP/1.1   null    /update/pass      Mozilla/5.0 (Windows; U; Windows NT 5.1)Gecko/20070309 Firefox/2.0.0.3  1523062237871
1003    125.39.129.67   6839fff8-7b3a-48f5-90cd-0f45c7be1aeb    10022   1       GET /tologin HTTP/1.0   408     null      Mozilla/5.0 (Windows; U; Windows NT 5.1)Gecko/20070309 Firefox/2.0.0.3  1523062238372
1000    61.172.249.96   89019ae0-6140-4e5a-9061-e3af74f3e4a8    10022   1       POST /stat HTTP/1.1     null    /passpword/getById?id=11  Mozilla/4.0 (compatible; MSIE 5.0; WindowsNT)   1523062238873

如果在uplooking03的Flume agent不配置hdfs.serializer=TEXT和hdfs.fileType=DataStream,那么上面查看到的数据会是16进制数据。

Flume集群之多个Agent多个source

结构说明

如下:

技术分享图片

配置案例

环境说明

在我们的环境中,如下:
技术分享图片

即在我们的环境中,日志源有三份,分别是data-access.log、data-ugchead.log、data-ugctail.log
不过在下面的实际配置中,日志源的agent我们只使用两个,uplooking01和uplooking02,它们的sink都
输出到uplooking03的source中。

配置

uplooking01uplooking02的配置都是一样的,如下:

#########################################################
##
##主要作用是监听文件中的新增数据,采集到数据之后,打印在控制台
##    注意:Flume agent的运行,主要就是配置source channel sink
##  下面的a1就是agent的代号,source叫r1 channel叫c1 sink叫k1
#########################################################
a1.sources = r1 r2 r3
a1.sinks = k1
a1.channels = c1

#对于source r1的配置描述 监听文件中的新增数据 exec
a1.sources.r1.type = exec
a1.sources.r1.command  = tail -F /home/uplooking/data/data-clean/data-access.log
a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type = static
##静态的在header中添加一个key value,下面就配置了两个拦截器,i1和i2
a1.sources.r1.interceptors.i1.key = type
a1.sources.r1.interceptors.i1.value = access
a1.sources.r1.interceptors.i2.type = timestamp
## timestamp的作用:这里配置了的话,在负责集中收集日志的flume agent就不需要配置
## a1.sinks.k1.hdfs.useLocalTimeStamp = true也能通过这些%y/%m/%d获取时间信息
## 这样一来的话,就可以减轻集中收集日志的flume agent的负担,因为此时的时间信息可以直接从source中获取

#对于source r2的配置描述 监听文件中的新增数据 exec
a1.sources.r2.type = exec
a1.sources.r2.command  = tail -F /home/uplooking/data/data-clean/data-ugchead.log
a1.sources.r2.interceptors = i1 i2
a1.sources.r2.interceptors.i1.type = static
##静态的在header中添加一个key value,下面就配置了两个拦截器,i1和i2
a1.sources.r2.interceptors.i1.key = type
a1.sources.r2.interceptors.i1.value = ugchead
a1.sources.r2.interceptors.i2.type = timestamp

#对于source r3的配置描述 监听文件中的新增数据 exec
a1.sources.r3.type = exec
a1.sources.r3.command  = tail -F /home/uplooking/data/data-clean/data-ugctail.log
a1.sources.r3.interceptors = i1 i2
a1.sources.r3.interceptors.i1.type = static
##静态的在header中添加一个key value,下面就配置了两个拦截器,i1和i2
a1.sources.r3.interceptors.i1.key = type
a1.sources.r3.interceptors.i1.value = ugctail
a1.sources.r3.interceptors.i2.type = timestamp

#对于sink的配置描述 使用avro日志做数据的消费
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = uplooking03
a1.sinks.k1.port = 44444

#对于channel的配置描述 使用文件做数据的临时缓存 这种的安全性要高
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /home/uplooking/data/flume/checkpoint
a1.channels.c1.dataDirs = /home/uplooking/data/flume/data

#通过channel c1将source r1 r2 r3和sink k1关联起来
a1.sources.r1.channels = c1
a1.sources.r2.channels = c1
a1.sources.r3.channels = c1
a1.sinks.k1.channel = c1

uplooking03的配置如下:

#########################################################
##
##主要作用是监听avro,采集到数据之后,输出到hdfs
##    注意:Flume agent的运行,主要就是配置source channel sink
##  下面的a1就是agent的代号,source叫r1 channel叫c1 sink叫k1
#########################################################
a1.sources = r1
a1.sinks = k1
a1.channels = c1

#对于source的配置描述 监听avro
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 44444

#对于sink的配置描述 使用log日志做数据的消费
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /input/data-clean/%type/%Y/%m/%d
a1.sinks.k1.hdfs.filePrefix = %type
a1.sinks.k1.hdfs.fileSuffix = .log
a1.sinks.k1.hdfs.inUseSuffix = .tmp
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollSize = 10485760
# 如果希望上面配置的日志文件滚动策略生效,则必须要配置下面这一项
a1.sinks.k1.hdfs.minBlockReplicas = 1
#配置下面两项后,保存到HDFS中的数据才是文本
#否则通过hdfs dfs -text查看时,显示的是经过压缩的16进制
a1.sinks.k1.hdfs.serializer = TEXT
a1.sinks.k1.hdfs.fileType = DataStream

#对于channel的配置描述 使用内存缓冲区域做数据的临时缓存
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

#通过channel c1将source r1和sink k1关联起来
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

测试

首先需要保证uplooking01uplooking02上都能正常地产生日志。

uplooking03上启动Agent:

[[email protected] flume]$ flume-ng agent -n a1 -c conf --conf-file conf/flume-source-avro.conf -Dflume.root.logger=INFO,console

分别在uplooking01uplooking02上启动Agent:

flume-ng agent -n a1 -c conf --conf-file conf/flume-sink-avro.conf -Dflume.root.logger=INFO,console

一段时间后,可以在HDFS中查看相应的日志文件:

$ hdfs dfs -ls /input/data-clean
18/04/08 01:34:36 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 3 items
drwxr-xr-x   - uplooking supergroup          0 2018-04-07 22:00 /input/data-clean/access
drwxr-xr-x   - uplooking supergroup          0 2018-04-07 22:00 /input/data-clean/ugchead
drwxr-xr-x   - uplooking supergroup          0 2018-04-07 22:00 /input/data-clean/ugctail

查看某个日志目录下的日志文件:

[[email protected] data-clean]$ hdfs dfs -ls /input/data-clean/access/2018/04/07
18/04/08 01:35:27 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 2 items
-rw-r--r--   3 uplooking supergroup    2447752 2018-04-08 01:02 /input/data-clean/access/2018/04/08/access.1523116801502.log
-rw-r--r--   3 uplooking supergroup       5804 2018-04-08 01:02 /input/data-clean/access/2018/04/08/access.1523120538070.log.tmp

可以看到日志文件数量非常少,那是因为前面在配置uplooking03的agent时,日志文件滚动的方式为,单个文件满10M再进行切分日志文件。

flume配置案例(代码片段)

...是flume2.flume的官方网站在哪里?3.flume有哪些术语?4.如何配置flume数据源码?一、什么是Flume?  flume作为cloudera开发的实时日志收集系统,受到了业界的认可与广泛应用。Flume初始的发行版本目前被统称为FlumeOG(originalgeneration)... 查看详情

大数据技术之flumeflume概述flume快速入门(代码片段)

...入门2.1Flume安装部署2.1.1安装地址2.1.2安装部署2.2Flume入门案例2.2.1监控端口数据官方案例2.2.2实时监控单个追加文件2.3.3实时监控目录下多个新文件2.2.4实时监控目录下的多个 查看详情

数仓采集之环境搭建hadoop,zookeeper,kafka,flume(代码片段)

数仓采集之环境搭建hadoop,zookeeper,kafka前期的阿里云ECS环境已装好,现在开始正式搭建项目的环境hadoop安装配置1.集群规划服务器hadoop102服务器hadoop103服务器hadoop104HDFSNameNodeDataNodeDataNodeDataNodeSecondaryNameNodeYarnNodeManagerResourcemanage 查看详情

flume-1.8.0_部署与常用案例(代码片段)

...,除了hdfs+mapreduce+hive组成分析系统的核心之外,还需要数据采集、结果数 查看详情

flume(代码片段)

Flume概述Flume是一个高可用、高可靠、分布式的海量日志数据采集、聚合、传输的系统。Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接收方的能力。Flume(Agent... 查看详情

大数据高级开发工程师——数据采集框架flume(代码片段)

文章目录数据采集框架FlumeFlume基本介绍概述运行机制Flume采集系统结构图1.简单结构2.复杂结构Flume实战案例采集网络端口数据1.Flume的安装部署2.开发配置文件3.启动4.使用telnet测试采集目录到HDFS1.需求分析2.开发配置文件3.启动&... 查看详情

flume案例之采集特定目录的数据到hdfs

一,准备环境  CentOs7,jdk1.7,hadoop-2.6.1, apache-flume-1.6.0-bin.tar.gz二,编写配置文件    在/home/flume/conf的目录下 创建配置文件#定义三大组件的名称agent1.sources=source1agent1.sinks=sink1agent1.channel 查看详情

大数据技术之flumeflume概述flume快速入门(代码片段)

...入门2.1Flume安装部署2.1.1安装地址2.1.2安装部署2.2Flume入门案例2.2.1监控端口数据官方案例2.2.2实时监控单个追加文件2.3.3实时监控目录下多个新文件2.2.4实时监控目录下的多个追加文件1Flume概述1.1Flume定义Flume是Cloudera提供的一个高... 查看详情

大数据技术之flumeflume概述flume快速入门(代码片段)

...入门2.1Flume安装部署2.1.1安装地址2.1.2安装部署2.2Flume入门案例2.2.1监控端口数据官方案例2.2.2实时监控单个追加文件2.3.3实时监控目录下多个新文件2.2.4实时监控目录下的多个追加文件1Flume概述1.1Flume定义Flume是Cloudera提供的一个高... 查看详情

flume整合kafka(代码片段)

一、需求利用flume采集Linux下的文件信息,并且传入到kafka集群当中。环境准备zookeeper集群和kafka集群安装好。二、配置flume官网下载flume。博主自己这里使用的是flume1.6.0。官网地址http://flume.apache.org/download.html解压缩。tar-zxvfapache-f... 查看详情

大数据flume企业开发实战(代码片段)

...#64;hadoop102datas]$mkdirflume3(2)创建flume-file-flume.conf配置1个接收日志文件的source和两个channel、两个sink,分别输送给flume-flume-hd 查看详情

flume原理分析与使用案例(代码片段)

1、flume的特点:  flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(比如文本、HDFS... 查看详情

打怪升级之小白的大数据之旅(七十二)<flume进阶>(代码片段)

...一下Flume的进阶知识点Flume的使用很简单,主要就是写配置文件,至于具体怎么配置,大家要善用官网。我再次介绍一下Fluem说明文档http://flume.apache.org/FlumeUs 查看详情

flume与kafka整合案例详解(代码片段)

环境配置名称版本下载地址Centos7.064x百度Zookeeper3.4.5Flume1.6.0Kafka2.1.0配置Flume这里就不介绍了零基础出门右转看Flume的文章flume笔记直接贴配置文件[root@zero239kafka_2.10-0.10.1.1]#cat/opt/hadoop/apache-flume-1.6.0-bin/conf/kafka-conf.prop 查看详情

打怪升级之小白的大数据之旅(七十二)<flume进阶>(代码片段)

...一下Flume的进阶知识点Flume的使用很简单,主要就是写配置文件,至于具体怎么配置,大家要善用官网。我再次介绍一下Fluem说明文档http://flume.apache.org/FlumeUserGuide.html官网文档是真的细,我的案例也是参考的官方示... 查看详情

flume集群安装监听测试(代码片段)

1、在已经搭建好集群基础上,配置监听机器配置,主机名hadoop1,flume-conf.properties配置文件#LicensedtotheApacheSoftwareFoundation(ASF)underone#ormorecontributorlicenseagreements.SeetheNOTICEfile#distributedwiththisworkforadditionalinformation#regardingcopyrightownership.The... 查看详情

flume案例总结。(代码片段)

文章目录一、Flume架构二、Flume传输过程三、角色类型&启动flume配置文件1)定义Agent2)定义Sources2.1)netcatSource2.2)execSource2.3)spooldirSource2.4)avroSource2.5)TaildirSource3ÿ 查看详情

springcloud之eureka集群搭建(代码片段)

前言Eureka集群搭建比较简单,每一台Eureka只需要在配置中指定另外多个Eureka的地址就可以实现一个集群的搭建了。采用之前的入门案例进行搭建,SpringCloud之Eureka入门案例详细步骤添加配置文件spring:application:#服务注册列... 查看详情