关键词:
概述
logstash 之所以强大和流行,与其丰富的过滤器插件是分不开的
过滤器提供的并不单单是过滤的功能,还可以对进入过滤器的原始数据进行复杂的逻辑处理,甚至添加独特的新事件到后续流程中
强大的文本解析工具 -- Grok
grok 是一个十分强大的 logstash filter 插件,他可以解析任何格式的文本,他是目前 logstash 中解析非结构化日志数据最好的方式
基本用法
Grok 的语法规则是:
%语法 : 语义
“语法”指的就是匹配的模式,例如使用 NUMBER 模式可以匹配出数字,IP 则会匹配出 127.0.0.1 这样的 IP 地址:
%NUMBER:lasttime%IP:client
默认情况下,所有“语义”都被保存成字符串,你也可以添加转换到的数据类型
%NUMBER:lasttime:int%IP:client
目前转换类型只支持 int 和 float
覆盖 -- overwrite
使用 Grok 的 overwrite 参数也可以覆盖日志中的信息
filter grok match => "message" => "%SYSLOGBASE %DATA:message" overwrite => [ "message" ]
日志中的 message 字段将会被覆盖
示例
对于下面的log,事实上是一个 HTTP 请求行:
55.3.244.1 GET /index.html 15824 0.043
我们可以使用下面的 logstash 配置:
input file path => "/var/log/http.log" filter grok match => "message" => "%IP:client %WORD:method %URIPATHPARAM:request %NUMBER:bytes %NUMBER:duration"
可以看到收集结果:
client: 55.3.244.1 method: GET request: /index.html bytes: 15824 duration: 0.043
将无结构的数据通过这样的方式实现了结构化输出
Grok 使用正则表达式
grok 是在正则表达式的基础上实现的(使用 Oniguruma 库),因此他可以解析任何正则表达式
创建模式
提取日志字段和正则表达式提取字段的规则一样:
(?<field_name>the pattern here)
首先,创建一个模式文件,写入你需要的正则表达式:
# contents of ./patterns/postfix: POSTFIX_QUEUEID [0-9A-F]10,11
然后配置你的 Logstash:
filter grok patterns_dir => "./patterns" match => "message" => "%SYSLOGBASE %POSTFIX_QUEUEID:queue_id: %GREEDYDATA:syslog_message"
针对日志:
Jan 1 06:25:43 mailserver14 postfix/cleanup[21403]: BEF25A72965: message-id=<20130101142543.5828399CCAF@mailserver14.example.com>
可以匹配出:
timestamp: Jan 1 06:25:43
logsource: mailserver14
program: postfix/cleanup
pid: 21403
queue_id: BEF25A72965
syslog_message: message-id=<20130101142543.5828399CCAF@mailserver14.example.com>
IP 位置插件 -- Geoip
Logstash 1.3.0 以上可以使用 geoip 插件获取 IP 对应的地理位置,对于 accesslog 等的统计来说,IP 来源是非常有用的一个信息
使用方法
geoip source => ...
示例
filter geoip source => "message"
运行结果:
"message" => "183.60.92.253", "@version" => "1", "@timestamp" => "2014-08-07T10:32:55.610Z", "host" => "raochenlindeMacBook-Air.local", "geoip" => "ip" => "183.60.92.253", "country_code2" => "CN", "country_code3" => "CHN", "country_name" => "China", "continent_code" => "AS", "region_name" => "30", "city_name" => "Guangzhou", "latitude" => 23.11670000000001, "longitude" => 113.25, "timezone" => "Asia/Chongqing", "real_region_name" => "Guangdong", "location" => [ [0] 113.25, [1] 23.11670000000001 ]
可以看到,logstash 通过提取到的 message 字段中的 IP,解析到了地理位置相关的一系列信息
当然,对于解析出的众多数据,你也可以通过 fields 选项进行筛选
filter geoip fields => ["city_name", "continent_code", "country_code2", "country_code3", "country_name", "dma_code", "ip", "latitude", "longitude", "postal_code", "region_name", "timezone"]
选项
上面我们看到了 source 和 fields 两个选项,geoip 还提供了下列选项:
选项 | 类型 | 是否必须 | 默认值 | 意义 |
add_field | hash | 否 | 为当前事件增加一个字段 | |
add_tag | array | 否 | [] | 为当前事件增加一个用于标识的tag |
database | path | 否 | 无 | 位置信息库所在文件 |
fields | array | 否 | 无 | 在 geoip 的返回结果中筛选部分字段 |
lru_cashe_size | int | 1000 | geoip 占用的缓存大小 | |
periodic_flush | bool | 否 | false | 是否定期调用刷新方e |
remove_field | array | 否 | [] | 从结果集中删除字段 |
remove_tag | array | 否 | [] | 从结果集中删除tag |
source | string | 是 | 无 | 需要解析的存有 IP 的字段名称 |
target | string | 否 | "geoip" | 返回的结果中保存 geoip 解析结果的字段名 |
json
对于 json 格式的 log,可以通过 codec 的 json 编码进行解析,但是如果记录中只有一部分是 json,这时候就需要在 filter 中使用 json 解码插件
示例
filter json source => "message" target => "jsoncontent"
运行结果:
"@version": "1", "@timestamp": "2014-11-18T08:11:33.000Z", "host": "web121.mweibo.tc.sinanode.com", "message": ""uid":3081609001,"type":"signal"", "jsoncontent": "uid": 3081609001, "type": "signal"
上面的例子中,解析结果被放到了 target 所指向的节点下,如果希望将解析结果与 log 中其他字段保持在同一层级输出,那么只需要去掉 target 即可:
"@version": "1", "@timestamp": "2014-11-18T08:11:33.000Z", "host": "web121.mweibo.tc.sinanode.com", "message": ""uid":3081609001,"type":"signal"", "uid": 3081609001, "type": "signal"
时间分割 -- split
mutiline 让 logstash 将多行数据变成一个事件,当然了,logstash 同样支持将一行数据变成多个事件
logstash 提供了 split 插件,用来把一行数据拆分成多个事件
示例:
filter split field => "message" terminator => "#"
运行结果:
对于 "test1#test2",上述 logstash 配置将其变成了下面两个事件:
"@version": "1", "@timestamp": "2014-11-18T08:11:33.000Z", "host": "web121.mweibo.tc.sinanode.com", "message": "test1" "@version": "1", "@timestamp": "2014-11-18T08:11:33.000Z", "host": "web121.mweibo.tc.sinanode.com", "message": "test2"
需要注意的是,当 split 插件执行结束后,会直接进入 output 阶段,其后的所有 filter 都将不会被执行
数据修改 -- mutate
logstash 还支持在 filter 中对事件中的数据进行修改
重命名 -- rename
对于已经存在的字段,重命名其字段名称
filter mutate rename => ["syslog_host", "host"]
更新字段内容 -- update
更新字段内容,如果字段不存在,不会新建
filter mutate update => "sample" => "My new message"
替换字段内容 -- replace
与 update 功能相同,区别在于如果字段不存在则会新建字段
filter mutate replace => "message" => "%source_host: My new message"
数据类型转换 -- convert
filter mutate convert => ["request_time", "float"]
文本替换 -- gsub
gsub 提供了通过正则表达式实现文本替换的功能
filter mutate gsub => [ # replace all forward slashes with underscore "fieldname", "/", "_", # replace backslashes, question marks, hashes, and minuses # with a dot "." "fieldname2", "[?#-]", "." ]
大小写转换 -- uppercase、lowercase
filter mutate uppercase => [ "fieldname" ]
去除空白字符 -- strip
类似 php 中的 trim,只去除首尾的空白字符
filter mutate strip => ["field1", "field2"]
删除字段 -- remove、remove_field
remove 不推荐使用,推荐使用 remove_field
filter mutate remove_field => [ "foo_%somefield" ]
删除字段 -- remove、remove_field
remove 不推荐使用,推荐使用 remove_field
filter mutate remove_field => [ "foo_%somefield" ]
分割字段 -- split
将提取到的某个字段按照某个字符分割
filter mutate split => ["message", "|"]
针对字符串 "123|321|adfd|dfjld*=123",可以看到输出结果:
"message" => [ [0] "123", [1] "321", [2] "adfd", [3] "dfjld*=123" ], "@version" => "1", "@timestamp" => "2014-08-20T15:58:23.120Z", "host" => "raochenlindeMacBook-Air.local"
聚合数组 -- join
将类型为 array 的字段中的 array 元素使用指定字符为分隔符聚合成一个字符串
如我们可以将 split 分割的结果再重新聚合起来:
filter mutate split => ["message", "|"] mutate join => ["message", ","]
输出:
"message" => "123,321,adfd,dfjld*=123", "@version" => "1", "@timestamp" => "2014-08-20T16:01:33.972Z", "host" => "raochenlindeMacBook-Air.local"
合并数组 -- merge
对于几个类型为 array 或 hash 或 string 的字段,我们可以使用 merge 合并
filter mutate merge => [ "dest_field", "added_field" ]
需要注意的是,array 和 hash 两个字段是不能 merge 的
logstash配置文件详解(代码片段)
logstashpipeline包含两个必须的元素:input和output,和一个可选元素:filter。 从input读取事件源,(经过filter解析和处理之后),从output输出到目标存储库(elasticsearch或其他)。 在生产环境使用logstash,一般使用都将配... 查看详情
logstash常用filter插件介绍(代码片段)
Filter是Logstash功能强大的主要原因,它可以对LogstashEvent进行丰富的处理,比如说解析数据、删除字段、类型转换等等,常见的有如下几个:date:日志解析grok:正则匹配解析dissect:分割符解析mutate:对字段做处理,比如重命名、... 查看详情
logstash清除sincedb_path上传记录,重传日志数据(代码片段)
logstash清除sincedb_path上传记录,重传日志数据logstash通过一个名为 sincedb_path下的记录文件记录当前logstash已经上传的日志文件的位置,如果指定为null,则使用home默认的。清除这个sincedb_path下的文件,将导致logstash... 查看详情
elk之logstash学习(代码片段)
Logstash最强大的功能在于丰富的过滤器插件。此过滤器提供的并不单单是过滤的功能,还可以对进入过滤器的原始数据进行复杂的逻辑处理。甚至添加独特的事件到后续流程中。1、logstash基本语法组成logstash主要由三部分组成:in... 查看详情
logstash:使用ruby过滤器(代码片段)
Logstash具有丰富的过滤器集,你甚至可以编写自己的过滤器,但是由于没有现成的过滤器,你可以直接将Ruby代码入配置文件中,因此通常不必创建自己的过滤器。使用logstash-filter-ruby,你可以使用Ruby字符串操... 查看详情
logstash备份跟踪上报的数据记录(代码片段)
logstash备份跟踪上报的数据记录logstash可以从被监测的文件上采集数据上报到指定的http服务器或者es等。有时候需要确认或者跟踪记录logstash到底成功上报的历史数据记录,可以在本地创建一个文件,用以记录、跟踪、备... 查看详情
使用docker部署filebeat和logstash(代码片段)
想用filebeat读取项目的日志,然后发送logstash。logstash官网有相关的教程,但是docker部署的教程都太简洁了。自己折腾了半天,走了不少坑,总算是将logstash和filebeat用docker部署好了,这儿简单记录一下 部署logstash1.编写logstash... 查看详情
ini使用nxlog和json传输将windows事件日志信息记录到logstash(代码片段)
logstash(代码片段)
简略介绍:Logstash是elastic技术栈中的一员。它是一个数据采集引擎,可以从数据库采集数据到es中。我们可以通过设置自增id主键或者时间来控制数据的自动同步,这个id或者时间就是用于给logstash进行识别哪些数据需要同步的。... 查看详情
logstash:处理多个input(代码片段)
我们知道Logstash的架构如下: 它的整个pipleline分为三个部分: input插件:提取数据。这可以来自日志文件,TCP或UDP侦听器,若干协议特定插件(如syslog或IRC)之一,甚至是排队系统(如Redis,AQMP或Kafka)。此阶段... 查看详情
elk之logstash配置文件详解(代码片段)
Logstash是一个开源的数据收集引擎,它具有备实时数据传输能力。它可以统一过滤来自不同源的数据,并按照开发者的制定的规范输出到目的地。它以插件的形式来组织功能,通过配置文件来描述需要插件做什么,配置文件... 查看详情
logstash:处理多个input(代码片段)
Logstash:处理多个inputLogstash的整个pipleline分为三个部分:input插件:提取数据。这可以来自日志文件,TCP或UDP侦听器,若干协议特定插件(如syslog或IRC)之一,甚至是排队系统(如Redis,AQMP或Kafka)。此阶段使用围绕事件来源的... 查看详情
logstash(代码片段)
简略介绍:Logstash是elastic技术栈中的一员。它是一个数据采集引擎,可以从数据库采集数据到es中。我们可以通过设置自增id主键或者时间来控制数据的自动同步,这个id或者时间就是用于给logstash进行识别哪些数据需要同步的。... 查看详情
logstash(代码片段)
1.概述 logstash是一个日志转化系统,用户通过定义一个input,filter,和一个output配置来完成日志的收集和存储工作。2.数据类型 booldebug=>truebytes my_bytes=>"113"#113bytes stringhost=>"hostname" numberport=>214 arrayma... 查看详情
使用logstashfiltergrok过滤日志文件(代码片段)
Logstash提供了一系列filter过滤plugin来处理收集到的logevent,根据logevent的特征去切分所需要的字段,方便kibana做visualize和dashboard的dataanalysis。所有logstash支持的event切分插件查看这里。下面我们主要讲grok切分。Grok基本介绍Grok使用... 查看详情
crm-项目记录(代码片段)
跨域问题SPRINGMVC配置了跨域,也使用了跨域注解,但是依然不能解决问题最后通过直接修改TOMCAT的WEB.XML文件解决的 打开tomcat安装目录->打开conf目录->打开web.xml文件 <filter><filter-name>CorsFilter</filter-name><f... 查看详情
elasticsearch+logstash+beats+kibana(代码片段)
综合练习下面将所学习到的Elasticsearch+Logstash+Beats+Kibana整合起来做一个综合性的练习,目的就是让学生们能够更加深刻的理解ElasticStack的使用。1.1、流程说明应用APP生产日志,用来记录用户的操作[INFO]2020-05-2716:49:48[cn.itcast.dashboar... 查看详情
logstash使用filter删除不需要的日志
参考技术A日志的流转路径:logstash收集log4j的日志,并对日志进行过滤,输出给elasticsearch,kibana从es的索引中查询数据进行展示。有一部分日志没有多大的意义,但是占据了很大的磁盘空间,因此想通过filter将其过滤掉,再将有... 查看详情