打怪升级之小白的大数据之旅(七十三)<flume高级>(代码片段)

GaryLea GaryLea     2022-11-23     191

关键词:

打怪升级之小白的大数据之旅(七十三)

Flume高级

上次回顾

上一章介绍了Flume的内部原理,本章就Flume的扩展知识进行讲解,本章的重点就是了解并学会使用Flume的自定义组件

自定义组件

在上一章介绍了内部原理,所以下面我们就可以根据内部原理来制定自定义的组件,例如上一章说的Channel选择器中的多路复用,就是需要搭配自定义拦截器Interceptor来使用

自定义 Interceptor

在实际开发中,自定义拦截器算是我们比较常用的手段,它可以配合channel选择器来将我们的日志信息分类存储,下面就通过案例来模拟实现此功能

案例需求:

  • 使用Flume采集服务器本地日志
  • 需要按照日志类型的不同,将不同种类的日志发往不同的分析系统

案例分析:

  • 实际的开发中,一台服务器产生的日志类型可能有很多种,不同类型的日志可能需要发送到不同的分析系统
  • 此时会用到Flume拓扑结构中的Multiplexing(多路复用)
  • Multiplexing的原理是,根据event中Header的某个key的值,将不同的event发送到不同的Channel中,所以我们需要自定义一个Interceptor,为不同类型的event的Header中的key赋予不同的值
  • 在该案例中,我们以端口数据模拟日志,以hello和非hello信息模拟不同类型的日志,我们需要自定义interceptor来对日志内容进行区分,将其分别发往不同的分析系统(Channel)


案例实现:

  • 既然是自定义拦截器,那么我们就需要写Java代码了,创建一个Maven工程,然后导入依赖:
 <dependencies>
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-core</artifactId>
            <version>1.9.0</version>
        </dependency>
    </dependencies>
  • 自定义拦截器类
package com.company.myinterceptor;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public class MyInterceptor implements Interceptor 
    // 声明存储事件集合
    private  List<Event> addHeaderEvents;

    // 初始化拦截器
    @Override
    public void initialize() 
        // 初始化集合
        addHeaderEvents = new ArrayList<>();
    

    // 处理单个事件
    @Override
    public Event intercept(Event event) 
        // 获取事件的头信息
        Map<String, String> headers = event.getHeaders();

        // 获取事件的body信息
        String eventBody = new String(event.getBody());

        if (eventBody.contains("hello"))
            headers.put("type","hello");
        else 
            headers.put("type","other");
        
        return event;
    

    // 批量处理事件
    @Override
    public List<Event> intercept(List<Event> events) 
        // 清空集合
        addHeaderEvents.clear();

        // 遍历events,对每个event添加头信息
        for (Event event : events) 
            // 添加头信息
            addHeaderEvents.add(intercept(event));

        

        // 返回events
        return addHeaderEvents;
    

    // 关闭资源
    @Override
    public void close() 

    

    // 拦截器类的构造方法
    public static class MyBuilder implements Interceptor.Builder

        @Override
        public Interceptor build() 
            return new MyInterceptor();
        

        @Override
        public void configure(Context context) 

        
    


将写好的拦截器类打包,改个名字,放到flume中

名字改为 myInterceptor.jar
放到 /opt/module/flume/lib文件夹下

配置flume文件:先在hadoop102, hadoop103,hadoop104.创建一个文件夹,存储配置文件

mkdir /opt/module/flume/job/group4

Flume1

vim /opt/module/flume/job/group4/flume1-netcat.conf

# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.company.myinterceptor.MyInterceptor$MyBuilder



a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = type
a1.sources.r1.selector.mapping.hello = c1
a1.sources.r1.selector.mapping.other= c2




# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop103
a1.sinks.k1.port = 4141

a1.sinks.k2.type=avro
a1.sinks.k2.hostname = hadoop104
a1.sinks.k2.port = 4142

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Use a channel which buffers events in memory
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100


# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

Flume2

# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1

a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop103
a2.sources.r1.port = 4141

a2.sinks.k1.type = logger

a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

a2.sinks.k1.channel = c1
a2.sources.r1.channels = c1

Flume3

# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c1

a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop104
a3.sources.r1.port = 4142

a3.sinks.k1.type = logger

a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100

a3.sinks.k1.channel = c1
a3.sources.r1.channels = c1

运行Flume

# flume3
flume-ng agent -n a3 -c /opt/module/flume/conf/ -f /opt/module/flume/job/group4/flume3-console2.conf -Dflume.root.logger=INFO,console

# flume2
flume-ng agent -n a2 -c /opt/module/flume/conf/ -f /opt/module/flume/job/group4/flume2-console1.conf -Dflume.root.logger=INFO,console

# flume1
flume-ng agent -c /opt/module/flume/conf/ -f /opt/module/flume/job/group4/flume1-netcat.conf -n a1 -Dflume.root.logger=INFO,console

测试数据

# 模拟数据产生,利用natcat向44444端口发送数据
nc localhost 44444
hello
hello world
hive
flume

自定义source

官方提供的Source有很多,如exec source 、avro source、Taildir source等,如果官方提供的source不能满足我们的需求,我们就可以根据需求自定义source(目前为止,我还没有遇到官方案例解决不了的需求)

自定义source的说明文档: https://flume.apache.org/FlumeDeveloperGuide.html#source

自定义source案例实现

案例需求:

  • 使用flume接收数据,并给每条数据添加前缀,输出到控制台
  • 前缀可从flume配置文件中配置

需求分析:

  • 因为配置Flume主要是通过配置文件来完成相应的数据传输,所以为了进行自定义Source,就需要知道实现该Source的类是什么,具体实现方法是什么
  • 通过源码分析,Source的实现底层是抽象类AbstractSource和Configurable与PollableSource接口
  • 它的主要方法是:
    • configure(Context context)//初始化context(读取配置文件内容)
    • process()//获取数据封装成event并写入channel,这个方法将被循环调用

      案例实现
  • 第一步还是创建maven工程并且导入依赖
     <dependencies>
            <dependency>
                <groupId>org.apache.flume</groupId>
                <artifactId>flume-ng-core</artifactId>
                <version>1.9.0</version>
            </dependency>
        </dependencies>
    

Mysource具体代码

package com.company;

import org.apache.flume.Context;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;

import java.util.HashMap;

public class MySource extends AbstractSource implements Configurable, PollableSource 

    //定义配置文件将来要读取的字段
    private Long delay;
    private String field;

    //初始化配置信息
    @Override
    public void configure(Context context) 
        delay = context.getLong("delay");
        field = context.getString("field", "Hello!");
    

    @Override
    public Status process() throws EventDeliveryException 

        try 
            //创建事件头信息
            HashMap<String, String> hearderMap = new HashMap<>();
            //创建事件
            SimpleEvent event = new SimpleEvent();
            //循环封装事件
            for (int i = 0; i < 5; i++) 
                //给事件设置头信息
                event.setHeaders(hearderMap);
                //给事件设置内容
                event.setBody((field + i).getBytes());
                //将事件写入channel
                getChannelProcessor().processEvent(event);
                Thread.sleep(delay);
            
         catch (Exception e) 
            e.printStackTrace();
            return Status.BACKOFF;
        
        return Status.READY;
    

    @Override
    public long getBackOffSleepIncrement() 
        return 0;
    

    @Override
    public long getMaxBackOffSleepInterval() 
        return 0;
    

将写好的Source类打包,改个名字,放到flume中

名字改为 mySource.jar
放到 /opt/module/flume/lib文件夹下

配置flume文件:先在hadoop102, hadoop103,hadoop104.创建一个文件夹,存储配置文件

mkdir /opt/module/flume/job/group5

这个需求比较简单,直接一个flume即可

vim /opt/module/flume/job/group5/mysource.conf

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = com.company.source.MySource
a1.sources.r1.delay = 1000
#a1.sources.r1.field = atguigu

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

运行flume

flume-ng agent -c /opt/module/flume/conf/ -f /opt/module/flume/job/group4/mysource.conf -n a1 -Dflume.root.logger=INFO,console

自定义sink

自定义sink也是一样,当根据业务需要时,就需要自定义sink来满足我们的需求,比如使用sink将数据写入到mysql中(后面会介绍sqoop,它专门用于写入数据库)

自定义sink的说明文档地址: https://flume.apache.org/FlumeDeveloperGuide.html#sink

自定义Sink案例实现

案例需求

  • 使用flume接收数据,并在Sink端给每条数据添加前缀和后缀,输出到控制台
  • 前后缀可在flume任务配置文件中配置

需求分析:

  • 同样的,我们需要了解Sink的实现方法,通过源码可以知道,自定义Sink需要继承AbstractSink类并实现Configurable接口
  • 主要实现的方法是:
    • configure(Context context)//初始化context(读取配置文件内容)
    • process()//从Channel读取获取数据(event),这个方法将被循环调用

案例实现

  • 第一步还是创建maven工程并且导入依赖
     <dependencies>
            <dependency>
                <groupId>org.apache.flume</groupId>
                <artifactId>flume-ng-core</artifactId>
                <version>1.9.0</version>
            </dependency>
        </dependencies>
    

mysink具体代码

package com.company;

import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MySink extends AbstractSink implements Configurable 

    //创建Logger对象
    private static final Logger LOG = LoggerFactory.getLogger(AbstractSink.class);

    private String prefix;
    private String suffix;

    @Override
    public Status process() throws EventDeliveryException 

        //声明返回值状态信息
        Status status;

        //获取当前Sink绑定的Channel
        Channel ch = getChannel();

        //获取事务
        Transaction txn = ch.getTransaction();

        //声明事件
        Event event;

        //开启事务
        txn.begin();

        //读取Channel中的事件,直到读取到事件结束循环
        while (true) 
            event = ch.take();
            if (event != null) 
                break;
            
        
        try 
            //处理事件(打印)
            LOG.info(prefix + new String(event.getBody()) + suffix);

            //事务提交
            txn.commit();
            status = Status.READY;
         catch (Exception e) 

            //遇到异常,事务回滚
            txn.rollback();
            status = Status.BACKOFF;
         finally 

            //关闭事务
            txn.close();
        
        return status;
    

    @Override
    public void configure(Context context) 

        //读取配置文件内容,有默认值
        prefix = context.getString("prefix", "hello:");

        //读取配置文件内容,无默认值
        suffix = context.getString("suffix");
    

将写好的Sink类打包,改个名字,放到flume中

名字改为 mySink.jar
放到 /opt/module/flume/lib文件夹下

配置flume文件:先在hadoop102, hadoop103,hadoop104.创建一个文件夹,存储配置文件

mkdir /opt/module/flume/job/group6

这个需求比较简单,直接一个flume即可

vim /opt/module/flume/job/group5/mysink.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = com.atguigu.MySink
#a1.sinks.k1.prefix = atguigu:
a1.sinks.k1.suffix = :atguigu

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

运行flume

flume-ng agent -c /opt/module/flume/conf/ -f /opt/module/flume/job/group4/mysink.conf -n a1 -Dflume.root.logger=INFO,console

使用Ganglia监控Flume

Ganglia可以让我们通过web页面,很方便地监控我们Flume的运行状态
Ganglia由gmond、gmetad和gweb三部分组成

  • gmond(Ganglia Monitoring Daemon)是一种轻量级服务,安装在每台需要收集指标数据的节点主机上。使用gmond,你可以很容易收集很多系统指标数据,如CPU、内存、磁盘、网络和活跃进程的数据等。
  • gmetad(Ganglia Meta Daemon)整合所有信息,并将其以RRD格式存储至磁盘的服务
  • gweb(Ganglia Web)Ganglia可视化工具,gweb是一种利用浏览器显示gmetad所存储数据的PHP前端。在Web界面中以图表方式展现集群的运行状态下收集的多种不同指标数据

Ganglia安装与部署

第一步: 三台节点安装epel源

sudo yum install -y epel-release

第二步: 在102安装web,meta和monitor

sudo yum -y install ganglia-gmetad ganglia-web ganglia-gmond

第三步: 在103、104安装monitor

sudo yum -y install ganglia-gmond

第四步:修改hadoop102的 ganglia 配置文件

sudo vim /etc/httpd/conf.d/ganglia.conf

<Location /ganglia>
  Require ip 192.168.5.1
  Require all granted
</Location>

第五步:修改hadoop102的 gmetad 配置文件

sudo vim /etc/ganglia/gmetad.conf

data_source "hadoop102" hadoop102

第六步:修改hadoop102的 gmond 配置文件

sudo vim /etc/ganglia/gmond.conf

cluster 
  name = "hadoop102"
  owner = "unspecified"
  latlong = "unspecified"
  url = "unspecified"

udp_send_channel 
  #bind_hostname = yes # Highly recommended, soon to be default.
                       # This option tells gmond to use a source address
                       # that resolves to the machine's hostname.  Without
                       # this, the metrics may appear to come from any
                       # interface and the DNS names associated with
                       # those IPs will be used to create the RRDs.
  # mcast_join = 239.2.11.71
  host = hadoop102
  port = 8649
  ttl = 1

udp_recv_channel 
  # mcast_join = 239.2.11.71
  port = 8649
  bind = 0.0.0.0
  retry_bind = true

第七步:同步修改后的文件

xsync  /etc/httpd
xsync  /etc/ganglia

第八步:修改hadoop selinux 配置文件

sudo vim /etc/selinux/config
# This file controls the state of SELinux on the system.
# SELINUX= can take one of these three values:
#     enforcing - SELinux security policy is enforced.
#     permissive - SELinux prints warnings instead of enforcing.
#     disabled - No SELinux policy is loaded.
SELINUX=disabled
# SELINUXTYPE= can take one of these two values:
#     targeted - Targeted processes are protected,
#     mls - Multi Level Security protection.
SELINUXTYPE=targeted

第九步:重启各个节点服务器

sudo reboot 

第十步:启动 ganglia

# hadoop102
start httpd
start gmetad
start gmond
# hadoop103/hadoop104
start gmond
start gmond

使用ganglia监控Flume

打开网页浏览:http://192.168.1.102/ganglia

我们启动一下前面的示例,就以necat flume举例

vim /opt/module/flume/打怪升级之小白的大数据之旅(七十三)<flume高级>(代码片段)

打怪升级之小白的大数据之旅(七十三)Flume高级上次回顾上一章介绍了Flume的内部原理,本章就Flume的扩展知识进行讲解,本章的重点就是了解并学会使用Flume的自定义组件自定义组件在上一章介绍了内部原理,所以下... 查看详情

打怪升级之小白的大数据之旅(七十四)<初识kafka>(代码片段)

打怪升级之小白的大数据之旅(七十四)初识Kafka引言学完Flume之后,接下来将为大家带来Kafka相关的知识点,在工作中,Kafka和Flume经常会搭配使用,那么Kafka究竟是什么呢?让我们开始今天的内容吧Kafka地图惯例&... 查看详情

打怪升级之小白的大数据之旅(七十一)<hadoop生态:初识flume>(代码片段)

打怪升级之小白的大数据之旅(七十一)Hadoop生态:初识Flume上次回顾上一章,我们学习完了hive的内容,本章开始是Hadoop中经常使用的另外一个框架Flume初识Flume下面这个是flume的标志flume的中文是水槽,但我觉得将它... 查看详情

打怪升级之小白的大数据之旅(七十二)<flume进阶>(代码片段)

打怪升级之小白的大数据之旅(七十二)Flume进阶上次回顾上一章对Flume的基础知识点进行了分享,有了上一章的铺垫,本章就深入学习一下Flume的进阶知识点Flume的使用很简单,主要就是写配置文件,至于具体怎么配... 查看详情

打怪升级之小白的大数据之旅(六十三)<hive旅程第四站:ddl操作>(代码片段)

打怪升级之小白的大数据之旅(六十三)Hive旅程第四站:DDL操作上次回顾上一章,我们学习了Hive的数据类型以及访问方式,本章节我们对数据库与数据表的操作进行学习DDL操作数据库操作的CURD数据库的操作和mysql相同,... 查看详情

打怪升级之小白的大数据之旅(七十四)<初识kafka>(代码片段)

打怪升级之小白的大数据之旅(七十四)初识Kafka引言学完Flume之后,接下来将为大家带来Kafka相关的知识点,在工作中,Kafka和Flume经常会搭配使用,那么Kafka究竟是什么呢?让我们开始今天的内容吧Kafka地图惯例&... 查看详情

打怪升级之小白的大数据之旅(七十)<hive旅程终点站:hive的综合案例>(代码片段)

打怪升级之小白的大数据之旅(七十)Hive旅程终点站:Hive的综合案例本章内容本章是Hive的最后一章,主要是通过一个案例来对我们前面所学的知识点进行一个实操总结,大家根据案例查漏补缺,哪里知识点不会就补一下... 查看详情

打怪升级之小白的大数据之旅(六十二)<hive旅程第三站:hive数据类型>(代码片段)

打怪升级之小白的大数据之旅(六十二)Hive旅程第三站:Hive数据类型上次回顾上一章,我们对Hive的安装进行了学习,本章正式学习Hive的相关操作,按照惯例,学习一个新的语言就要了解它的数据类型数据类型Hive的... 查看详情

打怪升级之小白的大数据之旅(六十一)<hive旅程第二站:hive安装>(代码片段)

打怪升级之小白的大数据之旅(六十一)Hive旅程第二站:Hive安装上次回顾上一章我们学习了Hive的概念以及框架原理,本章节是对Hive的安装进行分享,因为它有些需要自己配置的点,所以我单独开了一个章节Hive安装前期... 查看详情

打怪升级之小白的大数据之旅(六十四)<hive旅程第五站:dml基本操作>(代码片段)

打怪升级之小白的大数据之旅(六十四)Hive旅程第五站:DML基本操作上次回顾上一章,我们学习了Hive的DDL操作,学会如何操作数据库、数据表后,本章我们就要开始学习如何将数据导入到表中,如何将数据从表中导... 查看详情

打怪升级之小白的大数据之旅(六十九)<hive旅程第十站:hive的优化>(代码片段)

打怪升级之小白的大数据之旅(六十九)Hive旅程第十站:Hive的优化上次回顾上一章介绍了Hive的压缩与存储格式,本章节是Hive的一起其他优化方法Fetch抓取Fetch抓取是指,Hive中对某些情况的查询可以不必使用MapReduce计算从hive... 查看详情

打怪升级之小白的大数据之旅(六十五)<hive旅程第六站:hive的查询>(代码片段)

打怪升级之小白的大数据之旅(六十五)Hive旅程第六站:Hive的查询上次回顾经过前面的学习,我们已经可以初步使用Hive对数据的一些简单操作了,本章节是Hive的一个重点内容–查询查询HQL和我们前面学习的Mysql语法是一样... 查看详情

打怪升级之小白的大数据之旅(六十八)<hive旅程第九站:hive的压缩与存储>(代码片段)

打怪升级之小白的大数据之旅(六十八)Hive旅程第九站:Hive的压缩与存储上次回顾上一章,我们学习完了hive的函数相关操作,到此,我们hive的大的知识点就全部介绍完毕了,当然了,还有一些细节我没有讲到... 查看详情

打怪升级之小白的大数据之旅(六十六)<hive旅程第七站:hive的分区表与分桶表>(代码片段)

打怪升级之小白的大数据之旅(六十六)Hive旅程第七站:Hive的分区表与分桶表上次回顾上一章,我们学习了Hive的查询相关语法,本章节我们学习一下分区表与分桶表分区表通过前面Hive的学习,我们知道,表在HDFS中... 查看详情

打怪升级之小白的大数据之旅(六十)<hive旅程中的始发站>(代码片段)

打怪升级之小白的大数据之旅(六十)Hive旅程中的始发站引言经过了前面Hadoop、MR、Java、MySQL以及Linux的洗礼,接下来我们就要进入到大数据中特别重要的一个知识点学习–Hive,Hive是我们大数据日常工作中必不可少的一个技能&#x... 查看详情

flask连接数据库打怪升级之旅

...一个学习经验,也就是我们今天分享的连接数据库部分的打怪升级之旅。希望可以为大家在学习Python的路上提供一些参考。初级阶段首先安装Mysql扩展包650)this.width=650; 查看详情

flask连接数据库打怪升级之旅

...、初级阶段 1.Mysql扩展包 2.建立数据库连接3.开启打怪升级之路 在日常开发中,连 查看详情

通信算法之七十三:多径信道下的时频域turboequalization算法研究

 迭代均衡算法  查看详情