elk之filebeat结合logstash过滤出来你想要的日志(代码片段)

author author     2022-11-08     176

关键词:

先扯点没用的

收集日志的目的是有效的利用日志,有效利用日志的前提是日志经过格式化符合我们的要求,这样才能真正的高效利用收集到elasticsearch平台的日志。默认的日志到达elasticsearch 是原始格式,乱的让人抓狂,这个时候你会发现Logstash filter的可爱之处,它很像一块橡皮泥,如果我们手巧的话就会塑造出来让自己舒舒服服的作品,but 如果你没搞好的话那就另说了,本文的宗旨就是带你一起飞,搞定这块橡皮泥。当你搞定之后你会觉得kibana 的世界瞬间清爽了很多!
FIlebeat 的4大金刚
Filebeat 有4个非常重要的概念需要我们知道,
Prospector(矿工);
Harvest (收割者);
libeat (汇聚层);
registry(注册计入者);
Prospector 负责探索日志所在地,就如矿工一样要找矿,而Harvest如矿主一样的收割者矿工们的劳动成果,哎,世界无处不剥削啊!每个Prospector 都有一个对应的Harvest,然后他们有一个共同的老大叫做Libbeat,这个家伙会汇总所有的东西,然后把所有的日志传送给指定的客户,这其中还有个非常重要的角色”registry“,这个家伙相当于一个会计,它会记录Harvest 都收割了些啥,收割到哪里了,这样一但有问题了之后,harvest就会跑到会计哪里问:“上次老子的活干到那块了”?Registry 会告诉Harvest 你Y的上次干到哪里了,去哪里接着干就行了。这样就避免了数据重复收集的问题!

FIlebeat 详细配置:

filebeat.prospectors:
- input_type: log
  enabled: True
  paths:
    - /var/log/mysql-slow-*
#这个地方是关键,我们给上边日志加上了tags,方便在logstash里边通过这个tags 过滤并格式化自己想要的内容;
  tags: ["mysql_slow_logs"]
#有的时候日志不是一行输出的,如果不用multiline的话,会导致一条日志被分割成多条收集过来,形成不完整日志,这样的日志对于我们来说是没有用的!通过正则匹配语句开头,这样multiline 会在匹配开头
之后,一直到下一个这样开通的语句合并成一条语句。
#pattern:多行日志开始的那一行匹配的pattern
#negate:是否需要对pattern条件转置使用,不翻转设为true,反转设置为false
#match:匹配pattern后,与前面(before)还是后面(after)的内容合并为一条日志
#max_lines:合并的最多行数(包含匹配pattern的那一行 默认值是500行
#timeout:到了timeout之后,即使没有匹配一个新的pattern(发生一个新的事件),也把已经匹配的日志事件发送出去
  multiline.pattern: ‘^\d4/\d2/\d2‘  (2018\05\01  我的匹配是已这样的日期开头的)
  multiline.negate: true
  multiline.match: after
  multiline.Max_lines:20
  multiline.timeout: 10s
- input_type: log
  paths:
    - /var/log/mysql-sql-*
  tags: ["mysql_sql_logs"]
  multiline.pattern: ‘^\d4/\d2/\d2‘
  multiline.negate: true
  multiline.match: after
  multiline.timeout: 10s
  encoding: utf-8
  document_type: mysql-proxy
  scan_frequency: 20s
  harverster_buffer_size: 16384
  max_bytes: 10485760
  tail_files: true
 #tail_files:如果设置为true,Filebeat从文件尾开始监控文件新增内容,把新增的每一行文件作为一个事件依次发送,而不是从文件开始处重新发送所有内容。默认是false;

Logstash 的详细配置

input 
  kafka 
    topics_pattern => "mysql.*"
    bootstrap_servers => "x.x.x.x:9092"
    #auto_offset_rest => "latest"
    codec => json
    group_id => "logstash-g1"
   

#终于到了关键的地方了,logstash的filter ,使用filter 过滤出来我们想要的日志,
filter 
     #if 还可以使用or 或者and 作为条件语句,举个栗子: if “a” or “b”  or “c” in [tags],这样就可以过滤多个tags 的标签了,我们这个主要用在同型号的交换设备的日志正规化,比如说你有5台交换机,把日志指定到了同一个syslog-ng 上,收集日志的时候只能通过同一个filebeat,多个prospector加不同的tags。这个时候过滤就可以通过判断相应的tags来完成了。
    if "mysql_slow_logs" in [tags] 
    grok 
     #grok 里边有定义好的现场的模板你可以用,但是更多的是自定义模板,规则是这样的,小括号里边包含所有一个key和value,例子:(?<key>value),比如以下的信息,第一个我定义的key是data,表示方法为:?<key> 前边一个问号,然后用<>把key包含在里边去。value就是纯正则了,这个我就不举例子了。这个有个在线的调试库,可以供大家参考,http://grokdebug.herokuapp.com/ 
        match =>  "message" => "(?<date>\d4/\d2/\d2\s(?<datetime>%TIME))\s-\s(?<status>\w2)\s-\s(?<respond_time>\d+)\.\d+\w2\s-\s%IP:client:(?<client-port>\d+)\[\d+\]->%IP:server:(?<server-port>\d+).*:(?<databases><\w+>):(?<SQL>.*)"
#过滤完成之后把之前的message 移出掉,节约磁盘空间。
        remove_field => ["message"]
    
  
    else if "mysql_sql_logs" in [tags]
    grok 
        match =>  "message" => "(?<date>\d4/\d2/\d2\s(?<datetime>%TIME))\s-\s(?<status>\w2)\s-\s(?<respond_time>\d+\.\d+)\w2\s-\s%IP:client:(?<client-port>\d+)\[\d+\]->%IP:server:(?<server-port>\d+).*:(?<databases><\w+>):(?<SQL>.*)"
        remove_field => ["message"]
  

我要过滤的日志样本:

2018/05/01 16:16:01.892 - OK - 759.2ms - 172.29.1.7:35184[485388]->172.7.1.39:3306[1525162561129639717]:<DB>:select count(*) from test[];
过滤后的结果如下:

  "date": [
    [
      "2018/05/01 16:16:01.892"
    ]
  ],
  "datetime": [
    [
      "16:16:01.892"
    ]
  ],
  "TIME": [
    [
      "16:16:01.892"
    ]
  ],
  "HOUR": [
    [
      "16"
    ]
  ],
  "MINUTE": [
    [
      "16"
    ]
  ],
  "SECOND": [
    [
      "01.892"
    ]
  ],
  "status": [
    [
      "OK"
    ]
  ],
  "respond_time": [
    [
      "759"
    ]
  ],
  "client": [
    [
      "172.29.1.7"
    ]
  ],
  "IPV6": [
    [
      null,
      null
    ]
  ],
  "IPV4": [
    [
      "172.29.1.7",
      "172.7.1.39"
    ]
  ],
  "client-port": [
    [
      "35184"
    ]
  ],
  "server": [
    [
      "172.7.1.39"
    ]
  ],
  "server-port": [
    [
      "3306"
    ]
  ],
  "databases": [
    [
      "<DB>"
    ]
  ],
  "SQL": [
    [
      "select count(*) from test[];"
    ]
  ]

#有图有真相:
技术分享图片

elk之数据收集传输过滤filebeat+logstash部署

...#前端和消息队列搞定之后,我们需要安装数据采集工具filebeats和数据过滤机运输工具Logstash,一般情况我们都使用filebeats用来收集日志文件,我自定义了一个log文件,文件内容如下:55.3.244.1GET/index.html158240.04355.3.244.1GET/index. 查看详情

海量日志下的日志架构优化:filebeat+logstash+kafka+elk(代码片段)

前言:实验需求说明在前面的日志收集中,都是使用的filebeat+ELK的日志架构。但是如果业务每天会产生海量的日志,就有可能引发logstash和elasticsearch的性能瓶颈问题。因此改善这一问题的方法就是filebeat+logstash+kafka+ELK,也就是将... 查看详情

etl工具之日志采集filebeat+logstash

...志文件,需要进行日志收集并进行可视化展示,一般使用filebeat和logstash组合。Logstash是具有实时收集日志功能,可以动态统一来自不同来源的数据,任何类型的事件都可以通过各种各样的输入,过滤功能和输出插件来丰富和转换... 查看详情

elk之logstash(代码片段)

...:设置数据来源; 常用:File、syslog、redis、beats(如:Filebeats)   filter:对数据进行一定的加工处理过滤,但不建议做复杂的处理逻辑。此步骤不是必须的;常用:grok、mutate、drop、clone、geoip   output:设... 查看详情

filebeat+logstash对message提取指定字段

参考技术Afilebeat中message要么是一段字符串,要么在日志生成的时候拼接成json然后在filebeat中指定为json。但是大部分系统日志无法去修改日志格式,filebeat则无法通过正则去匹配出对应的field,这时需要结合logstash的grok来过滤,架... 查看详情

elasticsearch:elk架构(代码片段)

...ogstash导入数据到ES同步数据库数据到Elasticsearch什么是BeatsFileBeat简介FileBeat的工作原理logstashvsFileBeatFilebeat安装ELK整合实战案例:采集tomcat服务器日志使用FileBeats将日志发送到Logstash配置Logstash接收FileBeat收集的数据并打印Logstas... 查看详情

ELK 堆栈中的 Logstash 和 filebeat

】ELK堆栈中的Logstash和filebeat【英文标题】:LogstashandfilebeatintheELKstack【发布时间】:2019-08-1620:14:04【问题描述】:我们正在服务器上设置elasticsearch、kibana、logstash和filebeat,以分析来自许多应用程序的日志文件。由于原因*,每个... 查看详情

日志分析系统elk(elasticsearch+logstash+kibana+filebeat)

...;二、安装Logstash​​​​三、安装Kibana​​​​四、安装Filebeat​​​​五、集群模式​​搭建日志分析系统ELK(elasticsearch+logstash+kibana+filebeat)这里先介绍ELK的安装 首先下载ELK在官网下载:​​https://www.elastic.co/cn/downloads/​ 查看详情

elk入门-简单实现日志收集(代码片段)

...WEB配置Elasticsearch配置通过nginx访问elasticsearch和kibana扩展:filebeatinput配置排错方法组件简介和作用filebeat收集日志->logstash过滤/格式化->elasticsearch存储->kibana展示#个人理解其实logstash和filebeat都可以收集日志并且直接输出到elas... 查看详情

elk的logstash怎么过滤出报错的模块

...采集灵活性是我们选择日志采集方案更看重的因素,所以logstash属于首先方案,它可以兼顾多种不同系统和应用类型等因素的差异,从源头上进行一些初步的日志预处理。logstash唯一的小缺憾是它的不轻便,因为它是使用jruby开发... 查看详情

elk做日志分析(filebeat+logstash+elasticsearch)配置(代码片段)

利用Filebeat去读取日志发送到Logstash,再由Logstash处理后发送给Elasticsearch。一、Filebeat项目日志文件:利用Filebeat去读取文件,paths下面配置路径地址,Filebeat会自动去读取/data/share/business_log/TA-*/debug.log文件#===========================Filebea... 查看详情

elk应用之filebeat

参考技术AFilebeat是本地文件的日志数据采集器,可监控日志目录或特定日志文件(tailfile),并将它们转发给Elasticsearch或Logstatsh进行索引、kafka等。带有内部模块(auditd,Apache,Nginx,System和MySQL),可通过一个指定命令来简化通... 查看详情

elk(代码片段)

...综合实验案例一:单机ELK部署案例二.JAVA环境配置,部署filebeat+Elasticsearch收集apache日志nginx日志收集配置mysqlslow慢日志收集elk简介ELK是三个开源软件的缩写,分别表示:Elasticsearch,Logstash,Kibana,它们都是开源软件。新增了一个FileBea... 查看详情

日志分析系统elk之logstash(代码片段)

Logstash什么是ELKLogstash简介Logstash组成1、输入2、过滤器(可选)3、输出Logstash安装与配置通过命令行运行Logstash参数-e参数-f日志输出到文件日志上传到elasticsearchLogstash伪装为日志服务器grok过滤插件分割命令行的信息输出... 查看详情

在 ELK 堆栈中调试 Filebeat

】在ELK堆栈中调试Filebeat【英文标题】:DebuggingFilebeatintheELKstack【发布时间】:2018-04-1420:22:56【问题描述】:我的ELK系统出现了一些问题。客户端工作如下:Filebeat->Logstash-->Elastic-->Kibana我们的部分日志不会从特定机器到达... 查看详情

elk之logstash学习(代码片段)

Logstash最强大的功能在于丰富的过滤器插件。此过滤器提供的并不单单是过滤的功能,还可以对进入过滤器的原始数据进行复杂的逻辑处理。甚至添加独特的事件到后续流程中。1、logstash基本语法组成logstash主要由三部分组成:in... 查看详情

filebeat、logstash过滤器实例

参考技术A主要描述filebeat和logstash的过滤器使用。参考:  filebeat:  https://www.elastic.co/guide/en/beats/filebeat/6.8/defining-processors.html  logstash:  https://www.elastic.co/guide/en/logstash/6.8/filter-plugins.html版本:elastics... 查看详情

elk

#环境centos7.4,ELK6,单节点 #服务端Logstash收集,过滤  Elasticsearch存储,索引日志Kibana可视化 #客户端filebeat监控、转发,作为agentfilebeat-->Logstash-->Elasticsearch-->Kibana #内核echo‘*hardnofile65536* 查看详情