flume整合kafka(代码片段)

数月亮 数月亮     2022-11-19     452

关键词:

一、需求

  • 利用flume采集Linux下的文件信息,并且传入到kafka集群当中。
  • 环境准备zookeeper集群和kafka集群安装好。

二、配置flume

  • 官网下载flume。博主自己这里使用的是flume1.6.0。
  • 官网地址http://flume.apache.org/download.html
  • 解压缩。tar -zxvf apache-flume-1.6.0-bin.tar.gz -C /usr/apps/
  • 创建flume配置文件。
  • cd /usr/apps/flume/apache-flume-1.6.0-bin/conf
  • vim exec.conf  flume配置文件如下
  1 a1.sources = r1
  2 a1.channels = c1
  3 a1.sinks = k1
  4 a1.sources.r1.type = exec
  5 a1.sources.r1.command = tail -F /usr/test/click_log/1.log
  6 a1.sources.r1.channels = c1
  7 a1.channels.c1.type=memory
  8 a1.channels.c1.capacity=10000
  9 a1.channels.c1.transactionCapacity=100
 10 #固定的不需要修改
 11 a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
 12 #消息的主题
 13 a1.sinks.k1.topic = logMq
 14 #kafka broker集群
 15 a1.sinks.k1.brokerList = bhz136:9092,bhz137:9092,bhz139:9092
 16 a1.sinks.k1.requiredAcks = 1
 17 a1.sinks.k1.batchSize = 20
 18 a1.sinks.k1.channel = c1

三、启动flume连接到kafka

  • ./bin/flume-ng agent -n a1 -c conf -f conf/exec.conf -Dflume.root.logger=INFO,console

四、启动kafka消费者消费数据

  • kafka-console-consumer.sh --zookeeper bhz136:2181 --from-beginning --topic logMq

五、运行测试程序。测试结果如下:

image

注:1.log文件是通过以下脚本生成的测试文件

  1 for((i=0;i<=50000;i++));
  2         do echo "message-"+$i >>/usr/test/click_log/1.log;
  3 done


flume整合kafka(基于kerberos认证)——完成实时数据采集(代码片段)

如果现在要想将flume中的sink设置为kafka,因为在实际的开发中,可能会有若干个子系统或者若干个客户端进行flume日志采集,那么能够承受这种采集任务量的只有kafka来完成,可是需要注意一个问题,现在的kafka是采用了Kerberos认... 查看详情

sparkstreaming整合kafka(代码片段)

项目架构:日志数据---->flume----->kafka-------->sparkstreaming---------->mysql/redis/hbase前置条件:安装zookeeper安装flume安装kafakhadoop实现高可用(1)实现flume收集数据到kafka启动kafak:nohupkafka-server-start.sh/application/kafka_2.11-1.1.0/config/serve... 查看详情

flume整合数据到kafka,sparkstreaming消费数据,并存储到hbase和redis中(代码片段)

目录1、模拟数据生成2、flume采集数据 1、node01配置flume的conf文件 2、node02开发flume的配置文件3、node03开发flume的配置文件4、开发flume启动停止脚本 5、node01执行以下命令创建kafka的topic6、启动并查看kafka的数据3、SparkStreaming消费ka... 查看详情

kafka集成整合外部插件(springboot,flume,flink,spark)(代码片段)

一kafka集成springboot1.工程结构 2.pom文件<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.6.1</version> 查看详情

flume整合kafka完成实时数据采集(代码片段)

agent选择agent1 execsource+memorychannel+avrosinkagent2 avrosource+memorychannel 模拟实际工作中的场景,agent1为A机器,agent2为B机器。 avrosource:监听avro端口,并且接收来自外部avro信息,avrosink:一般用于跨节点传输,主要绑定数... 查看详情

flume+kafka+storm+redis大数据在线实时分析(代码片段)

...示中也能很好地说明这一点),即需要做各个系统之前的整合,包括Flume与Kafka的整合,Kafka与Storm的整合。当然,各个环境是否使用集群,依个人的实际需要而定,在我们的环境中,Flum 查看详情

day548.kafka相关外部系统整合-kafka(代码片段)

Kafka相关外部系统整合一、集成FlumeFlume是一个在大数据开发中非常常用的组件。可以用于Kafka的生产者,也可以用于Flume的消费者。1、Flume生产者启动kafka集群zk.shstartkf.shstart启动kafka消费者bin/kafka-console-consumer.sh--bootstrap-serverh... 查看详情

sparkstreaming基于sparkstreaming&flume&kafka打造通用流处理平台(代码片段)

通用流处理平台整合日志输出到Flume1.pom.xml2.结合log4j产生日志3.编写Flume配置文件streaming.conf4.Flume启动5.配置log4j.properties6.启动IDEA程序,查看日志接收情况整合Flume到Kafka1.启动zookeeper2.启动kafka3.查看Kafka的topic列表4.创建一个新... 查看详情

flume整合数据到kafka,sparkstreaming消费数据,并存储到hbase和redis中(代码片段)

目录1、模拟数据生成2、flume采集数据 1、node01配置flume的conf文件 2、node02开发flume的配置文件3、node03开发flume的配置文件4、开发flume启动停止脚本 5、node01执行以下命令创建kafka的topic6、启动并查看kafka的数据3、SparkStreaming消费ka... 查看详情

flume整合kafka

flume整合kafka:flume采集业务日志,发送到kafka 安装部署KafkaDownload1.0.0isthelatestrelease.Thecurrentstableversionis1.0.0.Youcanverifyyourdownloadbyfollowingthese procedures andusingthese KEYS.1.0 查看详情

flume整合kafka

1,安装并成功能运行flume2,安装并成功能运行kafka3,安装并成功能运行zookeeper4,开始整合flume收集的数据,写入kafkaa,修改flume的配置文加:vim flume_kafka.confagent1.sources=r1agent1.sinks=k1agent1.channels=c1#Describe/configurethesourceagent1.so 查看详情

flume+kafka整合

Flume+Kafka整合  一、准备工作准备5台内网服务器创建Zookeeper和Kafka集群服务器地址:192.168.2.240192.168.2.241192.168.2.242192.168.2.243192.168.2.244服务器系统:Centos6.5 64位  下载安装包Zookeeper:http://apache.fayea 查看详情

flume整合kafka

 背景:系统的数据量越来越大,日志不能再简单的文件的保存,如此日志将会越来越大,也不方便查找与分析,综合考虑下使用了flume来收集日志,收集日志后向kafka传递消息,下面给出具体的配置#Theconfigurationfileneedstodefinet... 查看详情

log4j整合flume(代码片段)

1.环境CDH5.16.1Spark2.3.0cloudera4Kafka2.1.0+kafka4.0.02.Log4j——>Flume2.1Log4j产生日志importorg.apache.log4j.Logger;/***@ClassNameLoggerGenerator*@Authorwuning*@Date:2020/2/310:54*@Description:模拟日志输出*/publicclassLoggerGeneratorprivatestaticLoggerlogger=Logger.getLogger(Logge... 查看详情

flume学习笔记之flumeng+kafka整合

FlumeNG集群+Kafka集群整合: 修改Flume配置文件(flume-kafka-server.conf),让Sink连上Kafkahadoop1:#setAgentnamea1.sources=r1a1.channels=c1a1.sinks=k1#setchannela1.channels.c1.type=memorya1.channels.c1.capacity=1000a 查看详情

flume对接kafka(代码片段)

目录一.简单实现二.自定义intercepor实现数据分离一.简单实现需求:根据flume监控exec文件的追加数据,写入kafka的test-demo分区,然后启用kafka-consumer消费test-demo分区数据。需求分析1)flume的配置文件在hadoop102上创建flume的配置文件#def... 查看详情

kafka+flume-ng+hdfs整合

Kafka    由LinkedIn于2010年12月(https://thenewstack.io/streaming-data-at-linkedin-apache-kafka-reaches-1-1-trillion-messages-per-day/)开源出来一个消息的发布/订阅系统,用scala实现;版本从0.05到现在0.10.2.0(2017-02-25)     系统中, 查看详情

sparkstreaming整合flume(代码片段)

SparkStreaming整合flume在实际开发中push会丢数据,因为push是由flume将数据发给程序,程序出错,丢失数据。所以不会使用不做讲解,这里讲解poll,拉去flume的数据,保证数据不丢失。1.首先你得有flume比如你有:【如果没有请走这篇... 查看详情