flume原理分析与使用案例(代码片段)

gxyandwmm gxyandwmm     2022-12-09     719

关键词:

1、flume的特点:

  flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(比如文本、HDFS、Hbase等)的能力 。

flume的数据流由事件(Event)贯穿始终。Event是Flume的基本数据单位,它携带日志数据(字节数组形式)并且携带有头信息,这些Event由Agent外部的Source生成,当Source捕获事件后会进行特定的格式化,然后Source会把事件推入(单个或多个)Channel中。你可以把Channel看作是一个缓冲区,它将保存事件直到Sink处理完该事件。Sink负责持久化日志或者把事件推向另一个Source。

   flume的可靠性 :

  当节点出现故障时,日志能够被传送到其他节点上而不会丢失。Flume提供了三种级别的可靠性保障,从强到弱依次分别为:end-to-end(收到数据agent首先将event写到磁盘上,当数据传送成功后,再删除;如果数据发送失败,可以重新发送。),Store on failure(这也是scribe采用的策略,当数据接收方crash时,将数据写到本地,待恢复后,继续发送),Besteffort(数据发送到接收方后,不会进行确认)。

  flume的可恢复性:

  还是靠Channel。推荐使用FileChannel,事件Event持久化在本地文件系统里(性能较差)。 

 

  flume的一些核心概念:

  Agent使用JVM 运行Flume。每台机器运行一个agent,但是可以在一个agent中包含多个sources和sinks。

  Client生产数据,运行在一个独立的线程。

  Source从Client收集数据,传递给Channel。

  Sink从Channel收集数据,运行在一个独立线程。

  Channel连接 sources 和 sinks ,这个有点像一个队列。

  Events可以是日志记录、 avro 对象等。

  Flume以agent为最小的独立运行单位。一个agent就是一个JVM。单agent由Source、Sink和Channel三大组件构成,如下图:

 

  值得注意的是,Flume提供了大量内置的Source、Channel和Sink类型。不同类型的Source、Channel和Sink可以自由组合。组合方式基于用户设置的配置文件,非常灵活。比如:Channel可以把事件暂存在内存里,也可以持久化到本地硬盘上。Sink可以把日志写入HDFS, HBase,甚至是另外一个Source等等。Flume支持用户建立多级流,也就是说,多个agent可以协同工作,并且支持Fan-in、Fan-out、Contextual Routing、Backup Routes,这也正是NB之处。如下图所示:

技术图片

 

 2、flume的案例

  Spool 监测配置的目录下新增的文件,并将文件中的数据读取出来。需要注意两点:

  1) 拷贝到spool目录下的文件不可以再打开编辑。
  2) spool目录下不可包含相应的子目录。

############################################

(a)log4j配置:

   我使用log4j的DailyRollingFileAppender去每分钟生成一个日志到配置的目录下,代码如下:

#输出信息到文件
log4j.appender.file = org.apache.log4j.DailyRollingFileAppender
#这个是生成日志文件的目录及文件名
log4j.appender.file.File = /Users/jsj/eclipse-workspace/log4j/src/main/java/testlog.log
log4j.appender.file.Append = true
#每分钟产生一个日志文件
#当前的文件名是testlog.log,前面分钟产生的文件是这种命名形式testlog.log.2018-08-20-18-16。
log4j.appender.file.DatePattern = ‘.‘yyyy-MM-dd-HH-mm
log4j.appender.file.layout = org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern = [%-5p]   %-dyyyy-MM-dd HH:mm:ss    %m%n

(b)模拟产生日志:

日志的内容(不含log4j中的配置)为:0a58f82b-ff6f-4feb-abe2-7c6ac9a0c24d####ERH####qhp####6677062格式为:用户ID--县号--镇号--收入

 

public class Main 

    public static void main(String[] args) throws Exception 

        Thread thread = new Thread(new GenerateRecord());
        thread.start();
    


class GenerateRecord extends Thread 

    private final Logger log = Logger.getLogger(GenerateRecord.class);

    public void run() 
        while (true) 
            // 随机产生一个用户uuid
            UUID userId = UUID.randomUUID();
            System.out.println(userId.toString().length());
            // 产生一个随机的用户总资产
            int num = (int) (Math.random() * 10000000) + 100000;
            // 产生一个随意的县名
            StringBuilder sb = new StringBuilder();
            for (int i = 0; i < 3; i++) 
                char a = (char) (Math.random() * (90 - 65) + 65);
                sb.append(a);
            
            String county = sb.toString();
            // 产生一个随机的镇名
            StringBuilder sb1 = new StringBuilder();
            for (int i = 0; i < 3; i++) 
                char a = (char) (Math.random() * (122 - 97) + 97);
                sb1.append(a);
            
            String town = sb1.toString();
            // 生成日志
            log.info(userId + "####" + county + "####" + town + "####" + num);
            // 停1秒钟
            try 
                Thread.sleep(1000);
             catch (InterruptedException e) 
                e.printStackTrace();
            
        
    

 

在几分钟后停掉程序,在终端输入cd /Users/jsj/eclipse-workspace/log4j/src/main/java/查看生成的文件 ls -1 ,如下:

 
技术图片

 

(c)创建agent配置文件:

在flume安装目录的conf/flume.conf下加入如下代码:

----------------------------------------------------------------

# my application flume configuration
#agent2是我们给agent起的名字
agent2.sources=source2
agent2.sinks=sink2
agent2.channels=channel2

#Spooling Directory
#set source2
#设置type为spooldir,这个值是flume给定的alias
agent2.sources.source2.type=spooldir
#设置监控目录,注意和前面log4j的目录不同
agent2.sources.source2.spoolDir=/Users/jsj/eclipse-workspace/logs

agent2.sources.source2.channels=channel2
agent2.sources.source2.fileHeader = false

#set sink2
agent2.sinks.sink2.type=hdfs
agent2.sinks.sink2.hdfs.path=hdfs://localhost:9000/flume
agent2.sinks.sink2.hdfs.fileType=DataStream
agent2.sinks.sink2.hdfs.writeFormat=TEXT
agent2.sinks.sink2.hdfs.rollInterval=60
agent2.sinks.sink2.channel=channel2
#设置存储到HDFS后文件的前缀
agent2.sinks.sink2.hdfs.filePrefix=%Y-%m-%d

#set channel2
#设置内存通道
agent2.channels.channel2.type=memory
agent2.channels.channel2.capacity=10000
agent2.channels.channel2.transactionCapacity=1000
agent2.channels.channel2.keep-alive=30

 ----------------------------------------------------------------

启动服务:

 

./flume-ng agent -c ../conf -f ../conf/flume.conf -Dflume.root.logger=INFO,console  -n agent2

观察日志:

此时flume的终端会嗖嗖嗖的刷日志,我截下来几条,主要是打开文件,对正在处理的文件改名为.tmp后缀,上传到HDFS后把HDFS上文件的.tmp删掉,本地的监控目录下文件加.COMPLETED后缀。

 

观察HDFS:

这时候我们去HDFS上检查一下:新开个终端输入hadoop fs -ls /flume,发现生成了比我们文件数多的多的文件,原来只有11个,现在有62个文件。

flume基础:自定义interceptor(代码片段)

...此时会用到Flume拓扑结构中的Multiplexing结构,Multiplexing的原理是,根据event中Header的某个key的值,将不同的event发送到不同的Chan 查看详情

大数据flume企业开发实战(代码片段)

目录1复制和多路复用1.1案例需求1.2需求分析:单数据源多出口案例(选择器)1.3实现步骤2负载均衡和故障转移2.1案例需求2.2需求分析:故障转移案例2.3实现步骤3聚合3.1案例需求3.2需求分析:多数据源汇总案例3.3实现步... 查看详情

flume与kafka整合案例详解(代码片段)

环境配置名称版本下载地址Centos7.064x百度Zookeeper3.4.5Flume1.6.0Kafka2.1.0配置Flume这里就不介绍了零基础出门右转看Flume的文章flume笔记直接贴配置文件[root@zero239kafka_2.10-0.10.1.1]#cat/opt/hadoop/apache-flume-1.6.0-bin/conf/kafka-conf.prop 查看详情

flume配置案例(代码片段)

问题导读1.什么是flume2.flume的官方网站在哪里?3.flume有哪些术语?4.如何配置flume数据源码?一、什么是Flume?  flume作为cloudera开发的实时日志收集系统,受到了业界的认可与广泛应用。Flume初始的发行版本目前被统称为FlumeOG(o... 查看详情

打怪升级之小白的大数据之旅(七十三)<flume高级>(代码片段)

...据之旅(七十三)Flume高级上次回顾上一章介绍了Flume的内部原理,本章就Flume的扩展知识进行讲解,本章的重点就是了解并学会使用Flume的自定义组件自定义组件在上一章介绍了内部原理,所以下面我们就可以根据内部... 查看详情

flume(代码片段)

...下的多个追加文件第3章Flume进阶3.1Flume事务3.2FlumeAgent内部原理3.3Flume拓扑结构3.4Flume企业开发案例3.4.1复制和多路复用3.4.2负 查看详情

大数据flume自定义类型(代码片段)

...会用到Flume拓扑结构中的Multiplexing结构,Multiplexing的原理是,根据event中Header的某个key的值,将不同的event发送到不同的Channel中,所以我们需要自定义一个Interceptor,为不同类型的event的Header中的key赋予不同的值... 查看详情

flume(代码片段)

...下的多个追加文件第3章Flume进阶3.1Flume事务3.2FlumeAgent内部原理3.3Flume拓扑结构3.4Flume企业开发案例3.4.1复制和多路复用3.4.2负载均衡和故障转移3.4.3聚合3.5自定义Interceptor(拦截器)3.6自定义Source3.7自定义Sink(更多自定... 查看详情

互联网大数据日志收集离线实时分析实战案例

本文通过这个项目可以学到那些东西:Flume配置与使用Kafka配置与使用Kafka与Flume整合KafkaJavaAPI调用Hadoop搭建配置Mapreduce离线分析/数据清洗Storm 实时分析Sqoop迁移数据Hbase配置与使用 查看详情

flume的安装与flume监控端口数据官方案例(代码片段)

...备二:安装步骤三.验证安装与Flume监控端口数据官方案例Flume是一个分布式、可靠、和高可用的海量日志采集、聚和和传输的系统。支持在日志系统中定制各类数据发送方,用于收集数据。同时,Flume提供对数据进行... 查看详情

flume案例1-arvo(代码片段)

...以定制avro-client发送一个指定的文件给Flumeagent,Avro源使用AvroRPC机制,Flume主要的RPCS 查看详情

flume案例1-arvo(代码片段)

...以定制avro-client发送一个指定的文件给Flumeagent,Avro源使用AvroRPC机制,Flume主要的RPCS 查看详情

flume(代码片段)

...的多个追加文件-taildir3Flume进阶3.1Flume事务3.2FlumeAgent内部原理3.3Flume拓扑结构3.3.1简单串联3.3.2复制和多路复用3.3.3负载均衡和故障转移3.3.4聚合3.4Flume企业开发案例3.4.1复制和多路复用3.4.2负载均衡和故障转移3.4.3聚合3.5自定义Interce... 查看详情

flume+kafka+storm+redis大数据在线实时分析(代码片段)

1、实时处理框架即从上面的架构中我们可以看出,其由下面的几部分构成:Flume集群Kafka集群Storm集群从构建实时处理系统的角度出发,我们需要做的是,如何让数据在各个不同的集群系统之间打通(从上面的图示中也能很好地... 查看详情

大数据技术之flume(代码片段)

...用2.1.2在flume目录下创建job文件夹并且创建flume文件。2.1.3使用netcat工具向本机的44444端口发送内容2.2案例22.3案例32 查看详情

java定时器使用与原理分析(代码片段)

如何使用首先定义一个类,继承TimerTaskstaticclassMyTimerTaskextendsTimerTaskprivatefinalMyWebSocketClientclient;publicMyTimerTask(MyWebSocketClientclient)this.client=client;@Overridepublicvoidrun()Sys 查看详情

java定时器使用与原理分析(代码片段)

如何使用首先定义一个类,继承TimerTaskstaticclassMyTimerTaskextendsTimerTaskprivatefinalMyWebSocketClientclient;publicMyTimerTask(MyWebSocketClientclient)this.client=client;@Overridepublicvoidrun()Sys 查看详情

java定时器使用与原理分析(代码片段)

如何使用首先定义一个类,继承TimerTaskstaticclassMyTimerTaskextendsTimerTaskprivatefinalMyWebSocketClientclient;publicMyTimerTask(MyWebSocketClientclient)this.client=client;@Overridepublicvoidrun()Sys 查看详情