flume+kakfa+sparkstream实时处理数据测试

     2022-04-05     609

关键词:

flume:从数据源拉取数据

kafka:主要起到缓冲从flume拉取多了的数据
sparkStream:对数据进行处理
 
一.flume拉取数据
 
1.源数据文件读取配置
 
在flume目录的conf目录下配置读取数据源的配置,配置一个test.properties文件,内容如下:
 
a1.sources = r1
a1.channels = c1 
a1.sinks = k1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/hadoop/hjh/spark/test.txt
a1.sources.r1.restartThrottle = 1000
a1.sources.r1.logStdErr = true
#a1.sources.r1.restart = true
a1.sources.r1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c1.keepalive = 100
a1.sinks.k1.type =org.apache.flume.plugins.KafkaSink
a1.sinks.k1.metadata.broker.list=192.168.22.7:9092,192.168.22.8:9092,192.168.22.9:9092
a1.sinks.k1.serializer.class=kafka.serializer.StringEncoder
a1.sinks.k1.request.required.acks=1
a1.sinks.k1.max.message.size=1000000
a1.sinks.k1.producer.type=sync
a1.sinks.k1.custom.encoding=UTF-8
a1.sinks.k1.custom.topic.name=test
a1.sinks.k1.channel=c1
a1.sinks.k1.product.source.name=6
配置读取源文件的读取路径如下:
a1.sources.r1.command = tail -F /home/hadoop/hjh/spark/test.txt
读取的数据传到kafka的哪个topic下:
a1.sinks.k1.custom.topic.name=test
2.启动flume读取数据
bin/flume-ng  agent -c conf -f conf/test.properties -n a1 -Dflume.root.logger=INFO,console
 
二.kafka缓冲数据
 
   1.启动zookeeper服务
bin/zookeeper-server-start.sh config/zookeeper.properties
   2.启动kafka服务
bin/kafka-server-start.sh config/server.properties &
3.创建一个topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
集群情况下,localhost换成集群的master地址
4.查看kafka的topic
bin/kafka-topics.sh --list --zookeeper localhost:2181
三.SparkStream处理数据
1.用spark中自带例子进行测试
进入spark目录
bin/run-example org.apache.spark.examples.streaming.KafkaWordCount zoo01,zoo02,zoo03 my-consumer-group test 1
zoo01,zoo02,zoo03替换为集群的zookeeper地址
2.往源文件中加入数据
echo "test test" >> test.txt
sparkStream会统计源数据中单词的数量并输出
 
 

flume+sparkstreaming实例实时监控文件demo

1,flume所在的节点不和spark同一个集群 v50和10-15节点flume在v50里面flume-agent.conf spark是开的work节点,就是单点计算节点,不涉及到master发送管理只是用到了sparkStreming的实时功能开启的是spark-shell不是spark-submit提交jar的形式... 查看详情

python爬虫等获取实时数据+flume+kafka+sparkstreaming+mysql+echarts实现数据动态实时采集分析展示

使用爬虫等获取实时数据+Flume+Kafka+SparkStreaming+mysql+Echarts实现数据动态实时采集、分析、展示主要工作流程如下所示:其中爬虫获取实时数据,并把数据实时传输到Linux本地文件夹中。使用Flume实时监控该文件夹,如果发现文件内... 查看详情

基于flume+kafka+sparkstreaming打造实时流处理项目实战课程

...消息队列Kafka、分布式列式数据库HBase、及当前最火爆的SparkStreaming打造实时流处理项目实战,让你掌握实时处理的整套处理流程,达到大数据中级研发工程师的水平!下载地址:百度网盘下载    IT交流群:9780552 查看详情

python爬虫等获取实时数据+flume+kafka+sparkstreaming+mysql+echarts实现数据动态实时采集分析展示(代码片段)

使用爬虫等获取实时数据+Flume+Kafka+SparkStreaming+mysql+Echarts实现数据动态实时采集、分析、展示主要工作流程如下所示:其中爬虫获取实时数据,并把数据实时传输到Linux本地文件夹中。使用Flume实时监控该文件夹,如果发现文件内... 查看详情

flume+kafka+sparkstreaming+hdfs构建实时日志分析系统

一、Hadoop配置安装        注意:apache提供的hadoop-2.6.0的安装包是在32位操作系统编译的,因为hadoop依赖一些C++的本地库,所以如果在64位的操作上安装hadoop-2.4.1就需要重新在64操作系统上重新编译1.修... 查看详情

[flume][kafka]flume与kakfa结合例子(kakfa作为flume的sink输出到kafkatopic)

Flume与Kakfa结合例子(Kakfa作为flume的sink输出到Kafkatopic)进行准备工作:$sudomkdir-p/flume/web_spooldir$sudochmoda+w-R/flume 编辑flume的配置文件:$cat/home/tester/flafka/spooldir_kafka.conf#Namethecomponentsonthisagentage 查看详情

sparkstreaming+flume+kafka实时流式处理完整流程(代码片段)

目录sparkstreaming+flume+kafka实时流式处理完整流程一、前期准备二、实现步骤1.引入依赖2.日志收集服务器3.日志接收服务器4、spark集群处理接收数据并写入数据库5、测试结果sparkstreaming+flume+kafka实时流式处理完整流程一... 查看详情

sparkstreaming和kafka数据丢失怎么处理

...数据生产者,比如flume.flume负责生产数据,发送至kafka。sparkstreaming作为消费者,实时的从kafka中获取数据进行计算。计算结果保存至redis,供实时推荐使用。flume+kafka+spark+redis是实时数据收集与计算的一套经典架构... 查看详情

慕课网实战sparkstreaming实时流处理项目实战笔记三之铭文升级版

铭文一级:Flume概述Flumeisadistributed,reliable,andavailableserviceforefficientlycollecting(收集),aggregating(聚合),andmoving(移动)largeamountsoflogdatawebserver(源端)===>flume===>hdfs(目的地)设计目标: 可靠性 扩展性 管理性业界同 查看详情

sparkstreaming

2.4.6.1.1概论SparkStreaming是一个对实时数据流进行高通量、容错处理的流式处理系统,可以对多种数据源(如Kdfka、Flume、Twitter、Zero和TCP 套接字)进行类似Map、Reduce和Join等复杂操作,并将结果保存到外部文件系统、数据库或应... 查看详情

sparkstreaming简介

 SparkStreaming是coreSpark的一个扩展,用来处理实时数据流,数据源可以来自Kafka,Flume,HDFS等,经过复杂的算法处理后,存入HDFS,数据库,或者实时的Dashboards. 从内部来看,SparkStreaming把进来的流式数据切成一小块一小块,然... 查看详情

sparkstreaming实时处理应用(代码片段)

...部署整个方案时,kafka和flume组件都执行得非常好,但是sparkstreaming应用需要花费4-8分钟来处理单个batch。这个延迟的原因有两点,一是我们使用DataFrame来强化数据,而强化数据需要从hive中读取大量的数据;二是我们的参数配置不... 查看详情

flume实时监控目录sink到hdfs

目标:Flume实时监控目录sink到hdfs,再用sparkStreaming监控hdfs的这个目录,对数据进行计算1、flume的配置,配置spoolDirSource_hdfsSink.properties,监控本地的一个目录,上传到hdfs一个目录下。agent1.channels=ch1agent1.sources=spoolDir-source1agent1.sin... 查看详情

sparkstreaming

一、flume整合sparkStreaming问题  1、如何实现sparkStreaming读取flume中的数据   sparkStreaming整合flume有2中模式,一种是拉模式,一种是推模式。比较两种模式的特点,如何部署。   推模式:Flume将数据Push推给SparkStreaming  ... 查看详情

flume整合数据到kafka,sparkstreaming消费数据,并存储到hbase和redis中(代码片段)

...1执行以下命令创建kafka的topic6、启动并查看kafka的数据3、SparkStreaming消费kafka中的数据 1、第一步sparkStreaming的连接2、第二步从kafka中获取数据信息,写了一个自定义方法getStreamingContextFromHBase3、第三步、消费数据,解析数... 查看详情

sparkstreaming整合flume

1目的  SparkStreaming整合Flume。参考官方整合文档(http://spark.apache.org/docs/2.2.0/streaming-flume-integration.html)2整合方式一:基于推2.1基本要求flume和spark一个work节点要在同一台机器上,flume会在本机器上通过配置的端口推送数据streami... 查看详情

sparkstreaming整合flume(代码片段)

SparkStreaming整合flume在实际开发中push会丢数据,因为push是由flume将数据发给程序,程序出错,丢失数据。所以不会使用不做讲解,这里讲解poll,拉去flume的数据,保证数据不丢失。1.首先你得有flume比如你有:【如果没有请走这篇... 查看详情

sparkstreaming学习之一sparkstreaming初识

...TP:Xftp4  jdk1.8  scala-2.10.4(依赖jdk1.8)  spark-1.6一、SparkStreaming简介SparkStreaming是流式处理框架,是SparkAPI的扩展,支持可扩展、高吞吐量、容错的实时数据流处理,实时数据的来源可以是:Kafka,Flume,Twitter,ZeroMQ或者TCPsockets,... 查看详情