flume,sqoop学习以及应用(代码片段)

shun7man shun7man     2023-04-24     301

关键词:

学习文档参考:http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html

1.Flume是什么?

Flume简单概括就是一个收集日志的工具,它可以通过调用接口,RPC,还有网页的一些操作进行日志的收集。它是一个分布式开源的Java编写的由Apache维护的项目。

2.Flume如何搭建

搭建前提条件

2.1下载并解压到指定目录

崇尚授人以渔的思想,我说给大家怎么下载就行了,就不直接放连接了,大家可以直接输入官网地址 http://flume.apache.org ,一般在官网的上方或者左边都会有Download按钮,这个在左侧,然后点进去下载想要的版本即可。 这个会有点慢,如果嫌弃的化,可以通过相关镜像网站进行下载,可以百度搜索软件镜像,就能搜到很多镜像网站,在里面就可以下载,如果你下载的东西属于Apache旗下的,可以看的有专门的一个Apache目录,里面存的都是Apache旗下相关产品。

可以先本地下载,然后通过ftp上传,也可以直接在服务器下载。

我这里下载好后,解压到了服务器/opt 目录下面,并修改了下目录名称为flume(你也可以不改,配置环境变量的时候按照实际情况来就行了。)

2.2在flume-env.sh里面配置Java路径

进入到 conf目录下面,对flume-env.sh进行编辑,将export JAVA_HOME修改为实际路径。

cd /opt/flume/conf/

vim flume-env.sh

export JAVA_HOME=/opt/java/jdk1.8.0_221

2.3添加Flume环境变量

环境变量存放的是软件的具体位置,运行程序命令会根据配置的变量找到软件并执行,否则会报错。(通过手动下载并上传到Linux服务器的都需要配置环境变量。)

vim /etc/profile

export FLUME_HOME=/opt/flume

export FLUME_CONF_DIR=/opt/flume/conf

PATH=$FLUME_HOME/bin

source /etc/profile

2.4通过flume-ng version验证是否配置成功

直接控制台运行 flume-ng version

显示Flume 1.6.0 就好了,如果显示了Error什么报错信息先不用管。

3.Flume应用

日志采集系统:

技术图片

3.1配置nginx环境

请参考菜鸟教程: https://www.runoob.com/linux/nginx-install-setup.html

按照上述步骤安装完后,需要对nginx配置下访问日志格式:

编辑nginx.conf,默认安装路径在/etc/nginx下

cd /etc/nginx

vim nginx.conf

在http模块下面添加:

解析:(以^A为日志分隔符,remote_addr代表远程地址,msec代表访问时间,http_host代表访问主机名,request_uri代表访问资源)

log_format my_format ‘$remote_addr^A$msec^A$http_host^A$request_uri‘;

在server模块下面添加:

解析:(访问地址 域名/log.gif,请求格式是image,存放地址是/opt/data/access.log )

location =/log.gif default_type image/gif; access_log /opt/data/access.log my_format;

这样访问nginx的时候就会生成类似下面的内容:

192.168.40.1^A1577365502.563^Atuge1^A/log.gif?en=e_crt&oid=123456&on=%E6%B5%8B%E8%AF%95%E8%AE%A2%E5%8D%95123456&cua=524.01&cut=RMB&pt=alipay&ver=1&pl=website&sdk=js&u_ud=039F6588-ED65-4187-87CF-9DBBC9F19645&u_mid=zhangsan&u_sd=605DECAA-93C0-46B7-AC47-7B1898DBD6BC&c_time=1577365502881&l=zh-CN&b_iev=Mozilla%2F5.0%20(Windows%20NT%2010.0%3B%20WOW64)%20AppleWebKit%2F537.36%20(KHTML%2C%20like%20Gecko)%20Chrome%2F78.0.3904.97%20Safari%2F537.36&b_rst=1536*86

3.2编写触发事件传输代码

代码思路:

  • 前端通过触发事件,生成image格式数据发送。
  • 后端通过API接口直接发送Get请求。

代码连接: https://gitee.com/shuai7boy/BIG_DATA_LOG

3.3配置Flume环境

  • 下载

    官网下载,然后上传到服务器并进行解压。

  • 在flume配置文件中配置java环境变量

    找到flume-env.sh进行配置,什么?没有?没关系,将flume-env.sh.template重命名下就行了。

  • 配置flume环境变量

    vim /etc/profile

    export FLUME_HOME=/opt/flume export FLUME_CONF_DIR=/opt/flume/conf

    PATH=$FLUME_HOME/bin

  • 验证是否配置成功

    flume-ng version

    当弹出以下内容,则说明配置成功。

    Flume 1.6.0 Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git Revision: 2561a23240a71ba20bf288c7c2cda88f443c2080 Compiled by hshreedharan on Mon May 11 11:15:44 PDT 2015 From source with checksum b29e416802ce9ece3269d34233baf43f

3.4运行Flume,将本地日志写入HDFS

编写flume代码,参考官方案例: http://flume.apache.org/releases/content/1.6.0/FlumeUserGuide.html

在安装Linux的服务器上,创建一个监控文件(姑且取名optionHdfs.conf):

# example.conf: A single-node Flume configuration

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/data/access.log --监控文件路径,新增加内容就会往hdfs里面写。

# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path =hdfs://tuge2:9000/flume/webdata/%Y-%m-%d --填写active NameNode
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

运行flume,同样参考官方来就行:flume-ng agent --conf conf --conf-file optionHdfs.conf --name a1 -Dflume.root.logger=INFO,console

运行后就按照文件规则将日志里面的内容导入hdfs里了。

3.5编写ETL代码,将HDFS内容导入到HBase里面

代码思路:

  • 使用Map/Reduce将hdfs中的内容提取出来进行分割处理,然后Map到HBase里面。

代码连接:https://gitee.com/shuai7boy/BIG_DATA_ETL

3.6使用Map/Reduce将HBase数据分析处理后导入到MySql

数据和维度:

  • 用户某段时间活跃量
  • 用户某段时间基于某个浏览器的活跃量
  • 用户某段时间新增认数
  • 用户某段时间基于某个浏览器的新增人数

主要拿用户时间活跃量和用户某段时间基于某个浏览器的活跃量来讨论。

代码思路:

  • 定义来源:

    在Runner类里面定义来源为HBase。

  • 定义维度

    将基于用户活跃度,基于某平台的用户活跃度,新增用户数,基于某平台的新增用户数等维度进行设定类。

  • 进行Map:

    继承TableMapper

    根据查询的数据来源,映射成 维度+用户信息。

  • 进行Reduce:

    继承Reducer

    计算成 维度+用户去重。

  • 进行To MySql:

    继承OutputFormat重写getRecordWriter,checkOutputSpecs,getOutputCommitter

    继承RecordWriter重写write和close

    继承IDimensionConverter重写getDimensionIdByValue,executeSql

    在Runner里面定义写入MySql。SQL语句都定义在了配置文件里面,根据维度进行调用。

    首先判断各个维度是否存在,不存在先写入维度信息。 然后就是写入更新统计信息(每映射10条更新一次。)

    代码链接: https://gitee.com/shuai7boy/BIG_DATA_TOMYSQL

4.Sqoop是什么?

Sqoop是一款导入导出数据的工具,可以将MySql,Oracel等关系型数据据导入到HDFS,Hive,HBase里面。

官方文档: http://sqoop.apache.org/docs/1.4.6/SqoopUserGuide.html

5.使用Sqoop将HBase数据计算并导入MySql

5.0安装Sqoop

1.在Linux服务器对Sqoop进行解压

我这里下载的是1.4.6版本。**请注意下载的时候一定要安装sqoop-1.4.6.bin__hadoop-2.0.4-alpha.tar.gz 压缩包,不要下载 sqoop-1.4.6.tar.gz 压缩包,因为这个压缩包少东西。**

下载地址 http://archive.apache.org/dist/sqoop/1.4.6/

下载好后使用ftp工具将文件上传到Linux服务器上(我上传到了/opt/sqoop下面)

然后使用tar -xvf sqoop-1.4.6.bin__hadoop-2.0.4-alpha.tar.gz 进行解压。

2.修改配置文件名字

进入到conf文件夹下,

mv sqoop-env-template.sh sqoop-env.sh

3.配置环境变量

vim /etc/profile

添加:

export SQOOP_HOME=/opt/sqoop/sqoop-1.4.6

path=$SQOOP_HOME/bin

4.校验是否安装成功

sqoop --version

(如果出现异常,请根据异常修改bin下面的configure-sqoop)

5.1在Hive里面创建数据表进行存数据

HBase表结构:Row,Name(列族+限定符),timestamp,Value

5.1.0在hive中创建hbase的eventlog对应表,并进行hive表和hbase表关联

hive和hbase表关联官方文档: https://cwiki.apache.org/confluence/display/Hive/HBaseIntegration

CREATE EXTERNAL TABLE event_logs(
key string, pl string, en string, s_time bigint, p_url string, u_ud string, u_sd string
) ROW FORMAT SERDE ‘org.apache.hadoop.hive.hbase.HBaseSerDe‘
STORED BY ‘org.apache.hadoop.hive.hbase.HBaseStorageHandler‘
with serdeproperties(‘hbase.columns.mapping‘=‘:key,log:pl,log:en,log:s_time,log:p_url,log:u_ud,log:u_sd‘)
tblproperties(‘hbase.table.name‘=‘eventlog‘);

5.1.1创建一个中间表(PS:要和MySql表结构保持一致,计算结果存放此表并同步MySql)

CREATE TABLE stats_view_depth (
platform_dimension_id bigint ,
data_dimension_id bigint ,
kpi_dimension_id bigint ,
pv1 bigint ,
pv2 bigint ,
pv3 bigint ,
pv4 bigint ,
pv5_10 bigint ,
pv10_30 bigint ,
pv30_60 bigint ,
pv60_plus bigint ,
created string
) row format delimited fields terminated by ‘ ‘;

5.1.2创建一个临时表(PS:存放中间结果)

CREATE TABLE stats_view_depth_tmp(pl string, date string, col string, ct bigint);

5.2编写platformdimension和datedimension

注:要继承udf

public class DateDimensionUDF extends UDF 
IDimensionConverter dimension=new DimensionConverImpl();
public IntWritable evaluate(Text txt) 
    try 
        DateDimension dateDimension=DateDimension.buildDate(TimeUtil.parseString2Long(txt.toString()), DateEnum.DAY);
        
        int id= dimension.getDimensionIdByValue(dateDimension);
        return new IntWritable(1);
        
    catch(IOException ex)
        throw new RuntimeException("获取datedimension id异常");
           


将编写的内容进行打包上传到linux服务器

5.3创建hive的function

create function date_convert as ‘shuai7boy.vip.transformer.hive.DateDimensionUDF‘ using jar ‘hdfs://tuge2:9000/transform/transform-0.0.1.jar‘;

其中一开始没加端口号报错:java.lang.IllegalArgumentException: java.net.UnknownHostException: transform ,然后参考博文 https://blog.csdn.net/heming621/article/details/53317562 解决了。

5.4编写HQL语句进行计算

5.4.1根据用户的角度统计每个页面的浏览量

上面我们将HDFS数据导入到了HBase里面,并且做了Hive表和HBase表同步,又因为Hive表支持HQL语句。所在在Hive里面使用HQL语句就能进行分析。

计算用户的浏览深度

from (
select
pl, from_unixtime(cast(s_time/1000 as bigint),‘yyyy-MM-dd‘) as day, u_ud,
(case when count(p_url) = 1 then "pv1"
when count(p_url) = 2 then "pv2"
when count(p_url) = 3 then "pv3"
when count(p_url) = 4 then "pv4"
when count(p_url) >= 5 and count(p_url) <10 then "pv5_10"
when count(p_url) >= 10 and count(p_url) <30 then "pv10_30"
when count(p_url) >=30 and count(p_url) <60 then "pv30_60"
else ‘pv60_plus‘ end) as pv
from event_logs
where
en=‘e_pv‘
and p_url is not null
and pl is not null
and s_time >= unix_timestamp(‘2016-06-08‘,‘yyyy-MM-dd‘)1000
and s_time < unix_timestamp(‘2016-06-09‘,‘yyyy-MM-dd‘)
1000
group by
pl, from_unixtime(cast(s_time/1000 as bigint),‘yyyy-MM-dd‘), u_ud
) as tmp
insert overwrite table stats_view_depth_tmp
select pl,day,pv,count(distinct u_ud) as ct where u_ud is not null group by pl,day,pv;

--将行转列

with tmp as
(
select pl,date as date1,ct as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col=‘pv1‘ union all
select pl,date as date1,0 as pv1,ct as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col=‘pv2‘ union all
select pl,date as date1,0 as pv1,0 as pv2,ct as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col=‘pv3‘ union all
select pl,date as date1,0 as pv1,0 as pv2,0 as pv3,ct as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col=‘pv4‘ union all
select pl,date as date1,0 as pv1,0 as pv2,0 as pv3,0 as pv4,ct as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col=‘pv5_10‘ union all
select pl,date as date1,0 as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,ct as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col=‘pv10_30‘ union all
select pl,date as date1,0 as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,ct as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col=‘pv30_60‘ union all
select pl,date as date1,0 as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,ct as pv60_plus from stats_view_depth_tmp where col=‘pv60_plus‘ union all

select ‘all‘ as pl,date as date1,ct as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col=‘pv1‘ union all
select ‘all‘ as pl,date as date1,0 as pv1,ct as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col=‘pv2‘ union all
select ‘all‘ as pl,date as date1,0 as pv1,0 as pv2,ct as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col=‘pv3‘ union all
select ‘all‘ as pl,date as date1,0 as pv1,0 as pv2,0 as pv3,ct as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col=‘pv4‘ union all
select ‘all‘ as pl,date as date1,0 as pv1,0 as pv2,0 as pv3,0 as pv4,ct as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col=‘pv5_10‘ union all
select ‘all‘ as pl,date as date1,0 as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,ct as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col=‘pv10_30‘ union all
select ‘all‘ as pl,date as date1,0 as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,ct as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col=‘pv30_60‘ union all
select ‘all‘ as pl,date as date1,0 as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,ct as pv60_plus from stats_view_depth_tmp where col=‘pv60_plus‘
)
from tmp
insert overwrite table stats_view_depth
select 2,date_convert(date1),6,sum(pv1),sum(pv2),sum(pv3),sum(pv4),sum(pv5_10),sum(pv10_30),sum(pv30_60),sum(pv60_plus),‘2017-01-10‘ group by pl,date1;

执行可能出现异常:Failed to recognize predicate ‘xxx‘. Failed rule: ‘identifier‘ in column specification。

解决方案:主要原因是使用了date关键字导致的,弃用保留关键字即可。

在hive-site.xml里面添加如下命令:


hive.support.sql11.reserved.keywords
false

参考博客: https://blog.csdn.net/sjf0115/article/details/73244762

5.5使用Sqoop将hive中的数据同步到MySql里面

退出hive命令,执行以下语句:

sqoop export --connect jdbc:mysql://tuge1:3306/result_db --username root --password 123456 --table stats_view_depth --export-dir /user/hive/warehouse/stats_view_depth/* --input-fields-terminated-by " " --update-mode allowinsert --update-key platform_dimension_id,data_dimension_id,kpi_dimension_id;

然后就能在MySql里面看到数据了,以后的事情就是把数据在平台渲染下,这里就不演示了。

flume和sqoop(代码片段)

 Sqoop简介Sqoop是一种旨在有效地在ApacheHadoop和诸如关系数据库等结构化数据存储之间传输大量数据的工具原理:将导入或导出命令翻译成Mapreduce程序来实现。  在翻译出的Mapreduce中主要是对InputFormat和OutputFormat进行定制RDBMS到... 查看详情

使用 SQOOP 和 FLUME 将数据从 RDBMS 移动到 Hadoop

...dFLUME【发布时间】:2014-03-1803:41:23【问题描述】:我正在学习Hadoop,并且在将数据从关系数据库移动到Hadoop以及反之亦然的过程中遇到了一些概念。我已经使用SQOOP导入查询将文件从MySQL传输到HDFS。我传输的文件是结构化数据集... 查看详情

flume学习(代码片段)

备忘录官方网站在linux的flume目录下,有一个docs文件夹,里面是一整个当前flume版本的引导文件,可以查看flume所有的属性hadoop在$HADOOP_HOME/share/doc/hadoopflume在$HADOOP_HOME/docssqoop在$SQOOP_HOME/docsspark在$SPARK_HOME/R/lib/SparkR1.flume... 查看详情

flume学习之路flume的基础介绍(代码片段)

一、背景Hadoop业务的整体开发流程:从Hadoop的业务开发流程图中可以看出,在大数据的业务处理过程中,对于数据的采集是十分重要的一步,也是不可避免的一步。许多公司的平台每天会产生大量的日志(一般为流式数据,如,... 查看详情

hadoop数据收集与入库系统flume与sqoop

...对存储格式没有要求。可以存储用户访问日志、产品信息以及网页数据等数据。    常见的两种数据来源。一种是分散的数据源:机器产生的数据、用户访问日志以及用户购买日志。另一种是传统系统中的数据:传统关... 查看详情

flume学习(代码片段)

简介:1.Flume原本是Cloudera公司开发的后来贡献给了Apache的一套分布式的、可靠的、针对日志数据进行收集、汇聚和传输的机制2.在大数据中,实际开发中有超过70%的数据来源于日志-日志是大数据的基石3.Flume针对日志提供... 查看详情

flume学习之路flume的配置方式(代码片段)

一、单一代理流配置1.1 官网介绍http://flume.apache.org/FlumeUserGuide.html#avro-source通过一个通道将来源和接收器链接。需要列出源,接收器和通道,为给定的代理,然后指向源和接收器及通道。一个源的实例可以指定多个通道,但只... 查看详情

flume学习之路flume的source类型(代码片段)

一、概述官方文档介绍:http://flume.apache.org/FlumeUserGuide.html#flume-sources二、FlumeSources描述2.1 AvroSource2.1.1 介绍Avro端口监听并接收来自外部的Avro客户流的事件。当内置Avro去Sinks另一个配对Flume代理,它就可以创建分层采集的拓扑... 查看详情

sqoop学习之路(代码片段)

一、概述二、工作机制三、安装1、前提概述2、软件下载3、安装步骤四、Sqoop的基本命令基本操作示例五、Sqoop的数据导入1、从RDBMS导入到HDFS中2、把MySQL数据库中的表数据导入到Hive中3、把MySQL数据库中的表数据导入到hbase 正文... 查看详情

flume学习之路flume的基础介绍(代码片段)

目录一、背景二、Flume的简介三、FlumeNG的介绍3.1 Flume特点3.2 Flume的一些核心概念3.3 FlumeNG的体系结构3.4 Source3.5 Channel3.6 Sink四、Flume的部署类型4.1 单一流程4.2 多代理流程(多个agent顺序连接)4.3 流的合并(多个Agent... 查看详情

数据集成:flume和sqoop

Flume和Sqoop是Hadoop数据集成和收集系统,两者的定位不一样,下面根据个人的经验与理解和大家做一个介绍:Flume由cloudera开发出来,有两大产品:Flume-og和Flume-ng,Flume-og的架构过于复杂,在寻问当中会有数据丢失,所以放弃了。... 查看详情

flume入门(代码片段)

前言:最近有些浮躁,大环境变化无常,这种情况下唯有学习才是王道,好吧,开始学习flume!一、Flume简介  Flume作为cloudera开发的实时日志收集系统,受到了业界的认可与广泛应用。  Flume初始的发行版本目前被统称为Flume... 查看详情

sqoop应用(代码片段)

1.导入数据(将mysql(rdbms)的表的数据导入到hdfs)1.1.全部导入(注意空格)sqoopimport--connectjdbc:mysql://192.168.159.110:3306/stu\(stu数据库名称写自己的ip)--usernameroot--password123456--tablestudents--target-dir/user/test3\(导入到hdfs位置 查看详情

flume架构以及应用介绍[转]

在具体介绍本文内容之前,先给大家看一下Hadoop业务的整体开发流程: 从Hadoop的业务开发流程图中可以看出,在大数据的业务处理过程中,对于数据的采集是十分重要的一步,也是不可避免的一步,从而引出我们本文的主角... 查看详情

flume架构以及应用介绍(转)

在具体介绍本文内容之前,先给大家看一下Hadoop业务的整体开发流程:从Hadoop的业务开发流程图中可以看出,在大数据的业务处理过程中,对于数据的采集是十分重要的一步,也是不可避免的一步,从而引出我们本文的主角—Flu... 查看详情

sqoop的介绍以及部署安装(代码片段)

1.sqoop的介绍(1)介绍:  Sqoop是Apache旗下的一款“hadoop和关系型数据库服务器之间传送数据”的工具。  导入数据:MySQL、Oracle导入数据到hadoop的hdfs、hive、HBASE等数据存储系统。  导出数据:从hadoop的文件... 查看详情

hadoop安装mysqlhive以及sqoop(步骤图文超详细版)(代码片段)

还没有搭建Hadoop,欢迎看看我前面的一篇文章:Hadoop集群搭建(步骤图文超详细版)目录一、前置条件二、安装Mysql三、安装Hive四、安装Sqoop一、前置条件需要安装下载方法Hive-1.2.1官网下载链接,提取码:nzyuSqoop-1.4.... 查看详情

flume从入门到实战(代码片段)

...为hadoop相关组件之一。尤其近几年随着flume的不断被完善以及升级版本的逐一推出,特别是flume-ng;,同时 查看详情