flume_企业中日志处理

author author     2022-08-14     210

关键词:

企业中的日志存放_1

201611/20161112.log.tmp
  第二天文件变为20161112.log与20161113.log.tmp
拷贝一份flume-conf.properties.template改名为dir-mem-hdfs.properties
实现监控某一目录,如有新文件产生则上传至hdfs,另外过滤掉新文件中tmp文件
dir-mem-hdfs.properties
  a1.sources = s1
  a1.channels = c1
  a1.sinks = k1
  # defined the source
  a1.sources.s1.type = spooldir
  a1.sources.s1.spoolDir = /opt/data/log_hive/20161109
  a1.sources.s1.includePattern = ([^ ]*.log$) # 包含某些字段
  a1.sources.s1.ignorePattern = ([^ ]*.tmp$)  # 忽略某些字段
  # defined the channel
  a1.channels.c1.type = memory
  a1.channels.c1.capacity = 1000
  a1.channels.c1.transactionCapacity = 1000
  # defined the sink
  a1.sinks.k1.type = hdfs
  a1.sinks.k1.hdfs.useLocalTimeStamp = true
  a1.sinks.k1.hdfs.path = /flume/spdir
  a1.sinks.k1.hdfs.fileType = DataStream 
  a1.sinks.k1.hdfs.rollInterval = 0
  a1.sinks.k1.hdfs.rollSize = 20480
  a1.sinks.k1.hdfs.rollCount = 0
  # The channel can be defined as follows.
  a1.sources.s1.channels = c1
  a1.sinks.k1.channel = c1
flmue目录下执行
  bin/flume-ng agent -c conf/ -n a1 -f conf/dir-mem-hdfs.properties -Dflume.root.logger=INFO,console
  这里使用了memory channel,可以使用file channel更加安全

 企业中的日志存放_2

201611/20161112.log
  第二天文件继续往20161112.log写
这样,既要使用exec和spoolingdir,如何处理
编译flume1.7版tail dir source,并集成到我们已有的flume环境
  1. window上下载安装git 
  2. 在某个目录下加一个空的文件夹(文件夹路径尽量不要有中文),例GitHub
  3. 使用github常用命令
    $ pwd
    $ ls
    $ cd /C/Users/Administrator/Desktop/GitHub
    $ git clone (https|git)://github.com/apache/flume.git
    $ cd flume
    $ git branch -r # 查看有哪些分支
    $ git branch -r # 查看当前属于哪个分支
    $ git checkout origin/flume-1.7 #别换分支
  拷贝flumeflume-ng-sourcesflume-taildir-source
  使用eclipse导入flume-taildir-source项目
  修改pom.xml
  <repositories>
    <repository>
      <id>cloudera</id>
      <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
    </repository>
  </repositories>
  <modelVersion>4.0.0</modelVersion>
  <groupId>org.apache.flume.flume-ng-sources</groupId>
  <artifactId>flume-taildir-source</artifactId>
  <version>1.5.0-cdh5.3.6</version>
  <name>Flume Taildir Source</name>
  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>2.3.2</version>
        <configuration>
          <source>1.7</source>
          <target>1.7</target>
        </configuration>
      </plugin>
    </plugins>
  </build>
  <dependencies>
    <dependency>
      <groupId>org.apache.flume</groupId>
      <artifactId>flume-ng-core</artifactId>
      <version>1.5.0-cdh5.3.6</version>
    </dependency>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.10</version>
      <scope>test</scope>
    </dependency>
  </dependencies>
  4. MAVEN_BULID项目,获取jar包并放到当前flume的环境中(lib目录) 
  5. 创建文件夹和文件
    $ mkdir -p /opt/cdh-5.6.3/apache-flume-1.5.0-cdh5.3.6-bin/position
    $ mkdir -p /opt/data/tail/hadoop-dir/
    $ echo "" > /opt/data/tail/hadoop.log
    拷贝一份flume-conf.properties.template改名为tail-mem-hdfs.properties
    可从源码看出需要的参数
      a1.sources = s1
      a1.channels = c1
      a1.sinks = k1
      # defined the source
      a1.sources.s1.type = org.apache.flume.source.taildir.TaildirSource
      a1.sources.s1.positionFile = /opt/cdh-5.6.3/apache-flume-1.5.0-cdh5.3.6-bin/position/taildir_position.json
      a1.sources.s1.filegroups = f1 f2
      a1.sources.s1.filegroups.f1 = /opt/data/tail/hadoop.log
      a1.sources.s1.filegroups.f2 = /opt/data/tail/hadoop-dir/.*
      a1.sources.s1.headers.f1.headerKey1 = value1
      a1.sources.s1.headers.f2.headerKey1 = value2-1
      a1.sources.s1.headers.f2.headerKey2 = value2-2
      a1.sources.s1.fileHeader = true
      # defined the channel
      a1.channels.c1.type = memory
      a1.channels.c1.capacity = 1000
      a1.channels.c1.transactionCapacity = 1000
      # defined the sink
      a1.sinks.k1.type = hdfs
      a1.sinks.k1.hdfs.useLocalTimeStamp = true
      a1.sinks.k1.hdfs.path = /flume/spdir
      a1.sinks.k1.hdfs.fileType = DataStream 
      a1.sinks.k1.hdfs.rollInterval = 0
      a1.sinks.k1.hdfs.rollSize = 20480
      a1.sinks.k1.hdfs.rollCount = 0
      # The channel can be defined as follows.
      a1.sources.s1.channels = c1
      a1.sinks.k1.channel = c1
  flmue目录下执行
    bin/flume-ng agent -c conf/ -n a1 -f conf/tail-mem-hdfs.properties -Dflume.root.logger=INFO,console
    测试文件或新数据

 企业中常用架构 Flume多sink

同一份数据采集到不同框架处理
采集source: 一份数据
管道channel: 多个
目标sink: 多个
如果多个sink从一个channel取数据将取不完整,而source会针对channel分别发送
设计: source--hive.log channel--file sink--hdfs(不同路径)
拷贝一份flume-conf.properties.template改名为hive-file-sinks.properties
hive-file-sinks.properties
  a1.sources = s1
  a1.channels = c1 c2
  a1.sinks = k1 k2
  # defined the source
  a1.sources.s1.type = exec
  a1.sources.s1.command = tail -F /opt/cdh-5.6.3/hive-0.13.1-cdh5.3.6/logs/hive.log
  a1.sources.s1.shell = /bin/sh -c
  # defined the channel 1
  a1.channels.c1.type = file
  a1.channels.c1.checkpointDir = /opt/cdh-5.6.3/apache-flume-1.5.0-cdh5.3.6-bin/datas/checkp1
  a1.channels.c1.dataDirs = /opt/cdh-5.6.3/apache-flume-1.5.0-cdh5.3.6-bin/datas/data1
  # defined the channel 2
  a1.channels.c2.type = file
  a1.channels.c2.checkpointDir = /opt/cdh-5.6.3/apache-flume-1.5.0-cdh5.3.6-bin/datas/checkp2
  a1.channels.c2.dataDirs = /opt/cdh-5.6.3/apache-flume-1.5.0-cdh5.3.6-bin/datas/data2
  # defined the sink 1
  a1.sinks.k1.type = hdfs
  a1.sinks.k1.hdfs.path = /flume/hdfs/sink1
  a1.sinks.k1.hdfs.fileType = DataStream 
  # defined the sink 2
  a1.sinks.k2.type = hdfs
  a1.sinks.k2.hdfs.path = /flume/hdfs/sink2
  a1.sinks.k2.hdfs.fileType = DataStream 
  # The channel can be defined as follows.
  a1.sources.s1.channels = c1 c2
  a1.sinks.k1.channel = c1
  a1.sinks.k2.channel = c2		
flmue目录下执行
  bin/flume-ng agent -c conf/ -n a1 -f conf/hive-file-sinks.properties -Dflume.root.logger=INFO,console
hive目录下执行
  bin/hive -e "show databases"		

flume_初识

企业架构数据源webserverRDBMS数据的采集shell、flume、sqoopjob监控和调度hue、oozie数据清洗及分析mapreduce、hive数据保存sqoop 概念:三大功能collecting(收集),aggregating(聚合),moving(传输)Flume是一个分布式的,可靠的,可用的,健壮且高容错性... 查看详情

flume日志导入elasticsearch

Flume配置。flume生成的数据结构<spanstyle="font-size:18px;">"_index":"logstash-2013.01.07","_type":"tms_jboss_syslog","_id":"a_M9X_0YSpmE7A_bEzIFiw","_score":1.0,"_source":{"@source":"file://localhost.loca 查看详情

日志采集框架flume

...捷的开源框架,如图所示:      1.日志采集框架Flume1.1Flume介绍1.1.1概述u Flume是一个分布式、可 查看详情

flume日志收集系统介绍

...本文的主角—Flume。本文将围绕Flume的架构、Flume的应用(日志采集 查看详情

flume集群日志收集

一、Flume简介  Flume是一个分布式的、高可用的海量日志收集、聚合和传输日志收集系统,支持在日志系统中定制各类数据发送方(如:Kafka,HDFS等),便于收集数据。其核心为agent,agent是一个java进程,运行在日志收集节点。... 查看详情

(01)flume简介

...e是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能... 查看详情

flume的介绍和简单操作(代码片段)

...e是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能... 查看详情

sparkstreaming+flume+kafka实时流式处理完整流程(代码片段)

...式处理完整流程一、前期准备二、实现步骤1.引入依赖2.日志收集服务器3.日志接收服务器4、spark集群处理接收数据并写入数据库5、测试结果sparkstreaming+flume+kafka实时流式处理完整流程一、前期准备1、环境准备,四台测... 查看详情

flume日志收集

Flume是一个分布式、可靠、和高可用的海量日志聚合的系统,支持在系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。设计目标:(1)可靠性当节点出现... 查看详情

flume初始(代码片段)

一、Flume是什么Flume是一个数据,日志收集的一个组件,可以用于对程序,nginx等日志的收集,而且非常简单,省时的做完收集的工作。Flume是一个分布式、可靠、和高可用的海量日志采集聚合和传输的系统。支持在日志系统中定... 查看详情

日志抽取框架flume简介与安装配置(代码片段)

...e是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能... 查看详情

flume入门(代码片段)

...始学习flume!一、Flume简介  Flume作为cloudera开发的实时日志收集系统,受到了业界的认可与广泛应用。  Flume初始的发行版本目前被统称为FlumeOG(originalgeneration),属于cloudera。重构后的版本统称为FlumeNG (nextgeneration),... 查看详情

flume学习之路flume的基础介绍(代码片段)

...也是不可避免的一步。许多公司的平台每天会产生大量的日志(一般为流式数据,如,搜索引擎的pv,查询等),处理这些日志需要特定的日志系统,一般而言,这些系统需要具有以下特征:1)构建应用系统和分析系统的桥梁,... 查看详情

教你一步搭建flume分布式日志系统

  在前篇几十条业务线日志系统如何收集处理?中已经介绍了Flume的众多应用场景,那此篇中先介绍如何搭建单机版日志系统。环境  CentOS7.0   Java1.8下载  官网下载http://flume.apache.org/download.html  当前最新版&n... 查看详情

flume安装与使用

 1.flume简介Flume是Cloudera提供的日志收集系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。Flume是一个分布式、可靠、和高可用... 查看详情

教你一步搭建flume分布式日志系统(代码片段)

  在前篇几十条业务线日志系统如何收集处理?中已经介绍了Flume的众多应用场景,那此篇中先介绍如何搭建单机版日志系统。环境  CentOS7.0   Java1.8下载  官网下载http://flume.apache.org/download.html  当前最新版&n... 查看详情

flume入门(代码片段)

2018-12-31  15:29:44Flume 百度百科:flume(日志收集系统)Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume... 查看详情

数据湖:海量日志采集引擎flume

...1.概述    Flume是的一个分布式、高可用、高可靠的海量日志采集、聚合和传输的系统,支持在日志系统中定制各类数据发送方,用于收集数据,同时提供了对数据进行简单处理并写到各种数据接收方的能力。    ... 查看详情