elk之数据收集传输过滤filebeat+logstash部署

author author     2022-11-05     605

关键词:

本文与前文是有关联的,之前的两篇文章客官可以抬腿出门右转 导读ELK 之前端ELK 之分布式发
#前端和消息队列搞定之后,我们需要安装数据采集工具filebeats和数据过滤机运输工具Logstash,一般情况我们都使用filebeats 用来收集日志文件,我自定义了一个log文件,文件内容如下:
55.3.244.1 GET /index.html 15824 0.043
55.3.244.1 GET /index.html 15824 0.043
文件位置放到/tmp/test.log

cd /opt/elk/filebeat-6.2.2-linux-x86_64
#filebeat 的配置文件大体上分为两部分,输入部分和输出部分,输入指定输入源即可,输出可以选择直接到elasticsearch,logstash或者是消息队列(redis&kafka)等,本文采用kafka 作为消息队列;
vi filebeat.yml
filebeat.prospectors:
#日志类型

  • type: log
    enabled: True

    日志路径可以

    写多个,支持通配符
    paths:

    • /tmp/test.log
      #设置字符集编码
      encoding: utf-8
      #文档类型
      document_type: my-nginx-log
      #每十秒扫描一次
      scan_frequency: 10s

      实际读取文件时,每次读取 16384 字节

      harverster_buffer_size: 16384
      #The maximum number of bytes that a single log message can have. All bytes after max_bytes are discarded and not sent. This setting is especially useful for multiline log messages, which can get large. The default is 10MB (10485760). 一次发生的log大小值;
      max_bytes: 10485760

      是否从文件末尾开始读取

      tail_files: true
      #给日志加上tags方便在logstash过滤日志的时候做判断
      tags: ["nginx-access"]
      #剔除以.gz 结尾的文件
      exclude_files: [".gz$"]
      #output 部分的配置文档
      #配置输出为kafka,无论你是tar包还是rpm安装,目录里边会看到官方提供的filebeat.full.yml OR filebeat.reference.yml 这里边有filebeat 所有的input 方法和output 方法供你参考;
      output.kafka:
      enabled: true
      #kafka 的server,可以配置集群,例子如下:
      hosts:["ip:9092","ip2:9092","ip3:9092"]
      #这个非常重要,filebeat作为provider,把数据输入到kafka里边,logstash 作为消费者去消费这些信息,logstash的input 中需要这个topic,不然logstash没有办法取到数据。
      topic: elk-%[type]

      The number of concurrent load-balanced Kafka output workers. kafka 的并发运行进程

      worker: 2
      #当传输给kafka 有问题的时候,重试的次数;
      max_retries: 3
      #单个kafka请求里面的最大事件数,默认2048
      bulk_max_size: 2048
      #等待kafka broker响应的时间,默认30s
      timeout: 30s
      #kafka broker等待请求的最大时长,默认10s
      broker_timeout: 10s
      #每个kafka broker在输出管道中的消息缓存数,默认256
      channel_buffer_size: 256
      #网络连接的保活时间,默认为0,不开启保活机制
      keep_alive: 60
      #输出压缩码,可选项有none, snappy, lz4 and gzip,默认为gzip (kafka支持的压缩,数据会先被压缩,然后被生产者发送,并且在服务端也是保持压缩状态,只有在最终的消费者端才会被解压缩)
      compression: gzip
      #允许的最大json消息大小,默认为1000000,超出的会被丢弃,应该小于broker的??message.max.bytes(broker能接收消息的最大字节数)
      max_message_bytes: 1000000
      #kafka的响应返回值,0位无等待响应返回,继续发送下一条消息;1表示等待本地提交(leader broker已经成功写入,但follower未写入),-1表示等待所有副本的提交,默认为1
      required_acks: 0
      #The configurable ClientID used for logging, debugging, and auditing purposes. The default is "beats"。客户端ID 用于日志怕错,审计等,默认是beats。
      client_id: beats
      #测试配置文件:/opt/elk/filebeat/filebeat -c /opt/elk/filebeat/filebeat.yml test config
      #如果配置文件没有问题的话,会出现config ok ,如果有问题会提示具体问题在哪里。
      #启动filebeat
      可以先通过 /opt/elk/filebeat-6.2.2-linux-x86_64/filebeat -c -e /opt/elk/filebeat-6.2.2-linux-x86_64/filebeat.yml 查看一下输入filebeat是否工作正常,会有很多信息打印到屏幕上;
      nohup /opt/elk/filebeat-6.2.2-linux-x86_64/filebeat -c /opt/elk/filebeat-6.2.2-linux-x86_64/filebeat.yml >>/dev/null 2>&1&

logstash 安装配置:
input
#数据来源
kafka
#这个对应filebeat的output 的index
topics_pattern => "elk-.*"
#kafka 的配置
bootstrap_servers => "IP1:9092,IP2:9092,IP3:9092"
kafka 中的group ID
group_id => "logstash-g1"


#过滤器,如果你想要日志按照你的需求输出的话,需要在logstash中过滤并且给与相应的key,比如以下的是nginx 日志,用的是logstash内置好的过滤器,%IP:client 这个里边包含了两个信息:ip地址,client 是在kibana 中显示的自定义键值,最终显示成这样,http://grokdebug.herokuapp.com/ 是在线调试logstash filter的工具,你可以在线调试你的过滤规则。

filter
grok
match => "message" => "%IP:client %WORD:method %URIPATHPARAM:request %NUMBER:bytes %NUMBER:duration"
#因为我们取出了字段,所以不需要原来这个message字段,这个字段里边包含之前beat 输入的所有字段。
remove_field => ["message"]


output
elasticsearch
hosts => ["IP:9200"]
#这个是在kibana上的index Patterns 的索引,建议什么服务就用干什么名字,因为beat 提供了些kibana的模板可以导入,导入的模板匹配的时候用的索引是对应服务的名称开头 。
index => "nginx-%+YYYY.MM.dd"
document_type => "nginx"
#每次20000 发送一次数据到elasticsearch
flush_size => 20000
如果不够20000,没10秒会发送一次数据;、
idle_flush_time =>10

日志从进入到输出的流程图:
技术分享图片

Kibana 上边的日志通过logstash 过滤之后取出来在kibana上显示如下:
技术分享图片

elk(代码片段)

...综合实验案例一:单机ELK部署案例二.JAVA环境配置,部署filebeat+Elasticsearch收集apache日志nginx日志收集配置mysqlslow慢日志收集elk简介ELK是三个开源软件的缩写,分别表示:Elasticsearch,Logstash,Kibana,它们都是开源软件。新增了一个FileBea... 查看详情

elk之filebeat收集多日志并自定义索引(代码片段)

...志,在此实验基础上,增加nginx的json日志收集,并自定义filebeat的索引。本次实验也是基于《ELK收集Apache的json格式访问日志并按状态码绘制图表》;2、将nginx和Apache的日志按照状态码绘制柱状图,并将其添加到dashboard;环境说明... 查看详情

使用filebeat采集日志结合logstash过滤出你想要的日志

...ithub.com/deviantony/docker-elk这里对es不做过多描述,主要针对filebeat和logstash讲解。Filebeat是一个轻量级的托运人,用于转发和集中日志数据。Filebeat作为代理安装在服务器上,监视您指定的日志文件或位置,收集日志事件,并将它们... 查看详情

elk日志系统设计方案-filebeat日志收集推送kafka(代码片段)

...目部署至服务器,运行服务后会生成日志文件。通过Filebeat监控相关文件夹,当有新日志产生,就读取新日志,将日志输送到Kafka中。经由Logstash消费Kafka生产的数据,进行加工过滤后输出到ElasticSearch进行日志... 查看详情

elk之elasticsearch插件导致filebeat没有上传日志至elasticsearch解决办法

  使用filebeat收集nginx发现日志为上传,elasticsearch没有日志,kibana没有展示  查看filebeat日志  日志目录为/var/log/filebeat 下面有多个日志文件,如果在日志文件filebeat没有发现报错信息可以查看filebeat.1234567等,发现日志... 查看详情

elasticsearch:elk架构(代码片段)

...ogstash导入数据到ES同步数据库数据到Elasticsearch什么是BeatsFileBeat简介FileBeat的工作原理logstashvsFileBeatFilebeat安装ELK整合实战案例:采集tomcat服务器日志使用FileBeats将日志发送到Logstash配置Logstash接收FileBeat收集的数据并打印Logstas... 查看详情

elk日志系统设计方案-filebeat日志收集推送kafka(代码片段)

...目部署至服务器,运行服务后会生成日志文件。通过Filebeat监控相关文件夹,当有新日志产生,就读取新日志,将日志输送到Kafka中。经由Logstash消费Kafka生产的数据,进行加工过滤后输出到ElasticSearch进行日志... 查看详情

elk日志系统设计方案-filebeat日志收集推送kafka(代码片段)

...目部署至服务器,运行服务后会生成日志文件。通过Filebeat监控相关文件夹,当有新日志产生,就读取新日志,将日志输送到Kafka中。经由Logstash消费Kafka生产的数据,进行加工过滤后输出到ElasticSearch进行日志... 查看详情

elk日志系统设计方案-filebeat日志收集推送kafka(代码片段)

...目部署至服务器,运行服务后会生成日志文件。通过Filebeat监控相关文件夹,当有新日志产生,就读取新日志,将日志输送到Kafka中。经由Logstash消费Kafka生产的数据,进行加工过滤后输出到ElasticSearch进行日志... 查看详情

海量日志下的日志架构优化:filebeat+logstash+kafka+elk(代码片段)

前言:实验需求说明在前面的日志收集中,都是使用的filebeat+ELK的日志架构。但是如果业务每天会产生海量的日志,就有可能引发logstash和elasticsearch的性能瓶颈问题。因此改善这一问题的方法就是filebeat+logstash+kafka+ELK,也就是将... 查看详情

elk应用之filebeat

参考技术AFilebeat是本地文件的日志数据采集器,可监控日志目录或特定日志文件(tailfile),并将它们转发给Elasticsearch或Logstatsh进行索引、kafka等。带有内部模块(auditd,Apache,Nginx,System和MySQL),可通过一个指定命令来简化通... 查看详情

如何利用filebeat把不同服务器上的log4j日志传输到同一台elk服务器

...ELK服务器,介于公司服务器资源紧张(^_^)2.我们需要用到filebeat什么是filebeat?filebeat被用来shipevents,即把一台服务器上的文件日志通过socket的方式,传输到远程的ELK。可以传输到logstash,也可以直接传输到elasticsearch。3.我们这里讲 查看详情

elk入门-简单实现日志收集(代码片段)

...WEB配置Elasticsearch配置通过nginx访问elasticsearch和kibana扩展:filebeatinput配置排错方法组件简介和作用filebeat收集日志->logstash过滤/格式化->elasticsearch存储->kibana展示#个人理解其实logstash和filebeat都可以收集日志并且直接输出到elas... 查看详情

elk日志收集(代码片段)

ELK日志收集Elasticsearch#数据库,存数据JAVALogstash#收集日志,过滤数据JAVAKibana#分析,过滤,展示JAVAFilebeat#收集日志,传输到ESGO#日志收集分类代理层:nginx,haproxyweb层:nginx,tomcat数据库层:mysql,redis,mongo,elasticsearch操作系统层:source,message... 查看详情

linux安装配置elk日志收集系统,elasticsearch+kibana+filebeat轻量级配置安装(代码片段)

...1a;Elasticsearch,Logstash,Kibana,它们都是开源软件。新增了一个FileBeat,它是一个轻量级的日志收集处理工具(Agent),Filebeat占用资源少,适合于在各个服务器上搜集日志后传输给Logstash,官方也推荐此工具。Elasticsearch是... 查看详情

elk构建mysql慢日志收集平台

...(它是一个轻量级的日志采集器),Beats家族有6个成员,Filebeat工具,它是一个用于在客户端收集日志的轻量级管理工具。F也可以代表工具fluentd,它是这套架构里面常用的日志收集、处理转发的工具。那么它们(LogstashVSBeatsVSflu... 查看详情

kafka+zookeeper+filebeat+elk搭建日志收集系统(代码片段)

ELKELK目前主流的一种日志系统,过多的就不多介绍了Filebeat收集日志,将收集的日志输出到kafka,避免网络问题丢失信息kafka接收到日志消息后直接消费到LogstashLogstash将从kafka中的日志发往elasticsearchKibana对elasticsearch中的日志数据进行... 查看详情

etl工具之日志采集filebeat+logstash

...志文件,需要进行日志收集并进行可视化展示,一般使用filebeat和logstash组合。Logstash是具有实时收集日志功能,可以动态统一来自不同来源的数据,任何类型的事件都可以通过各种各样的输入,过滤功能和输出插件来丰富和转换... 查看详情