logstash-2-插件配置

bronk bronk     2022-09-16     403

关键词:

配置语法:  Logstash必须有一个 input 和一个 output

1, 处理输入的input 

1), 从文件录入

logstash使用一个名为 filewatch的 ruby gem库来监听文件变化, 这个库记录一个 .sincedb的数据文件跟踪监听日志文件的当前位置

input {
    file {
        path => ["/var/log/*.log", "/var/log/message"]
        type => "system"
        start_position => "beginning"
    }
}
output {
    stdout{
        codec=>rubydebug    
    }
}
 

其他配置

discover_interval: 每隔多久检查path下是否有新文件, 默认15s
exclude: 不行呗监听的文件排除
close_older: 被监听的文件多久没更新就关闭监听, 默认3600s
ignore_older: 检查文件列表时, 如果最后修改时间超过这个值, 就虎烈

2) 标准输入: (Stdin)

logstash最简单最基本的输入方式

在 {LH}/下, 新建  stdin.conf, 并输入以下内容: 

input{
    stdin{
        add_field=>{"key"=>"value"}
        codec=>"plain"
        tags=>["add"]
        type=>"std"
    }
}
output {
    stdout{
        codec=>rubydebug    
    }
}

使用命令运行

./bin/logstash -f ./stdin.conf

启动后输入 helloworld, 可以看到如下输出

这儿的 type 和tags 是logstash的两个特俗字段, 通常会在输入区域通过type标记事件类型, tags则在数据处理阶段, 由具体的插件来添加或删除的

 3) syslog

 从设备上收集日志的时候可用 

input {
  syslog {
    port => "514"
  }
}
output {
    stdout{
        codec=>rubydebug    
    }
}

此时, 系统的日志都会到logstash中来, 建议使用使用LogStash::Inputs::TCP和 LogStash::Filters::Grok 配合实现同样的 syslog 功能! 具体可见: https://kibana.logstash.es/content/logstash/plugins/input/syslog.html

input {
  tcp {
    port => "8514"
  }
}
filter {
  grok {
    match => ["message", "%{SYSLOGLINE}" ]
  }
  syslog_pri { }
}

4) 网络数据读取, tcp

可被redis,等替代作为 logstash broker 的角色, 但logstash又自己的tcp插件 

input {
    tcp {
        port => 8888
        mode => "server"
        ssl_enable => false
    }
}

最佳使用是: 配合  nc 命令导入就数据

# nc 127.0.0.1 8888 < olddata

导入完毕后, nc命令会结束, 如果使用file会一直监听新数据

 

2 编码插件 codec

使用codec可以处理不同类型的数据,  使得logstash 形成 input | decode | filter | encode | output 的数据流, codec就是用来 encode 和 decode的

1), json格式

将nginx的日志导入为json格式: nginx需要配置 conf, 在 http{} 下进行配置, 所有server共享

logformat json '{"@timestamp":"$time_iso8601",'
               '"@version":"1",'
               '"host":"$server_addr",'
               '"client":"$remote_addr",'
               '"size":$body_bytes_sent,'
               '"responsetime":$request_time,'
               '"domain":"$host",'
               '"url":"$uri",'
               '"status":"$status"}';
access_log /var/log/nginx/access.log_json json;

修改stdin.conf

input {
    file {
        path => "/var/log/nginx/access.log_json"
        codec => "json"
    }
}

然后访问本地nginx, 可以看到logstash输出: 

 

2), multiline 合并多行数据

 一个事件打印多行内容, 很难通过命令行解析分析, 因此需要: 

input {
    stdin {
        codec => multiline {
            pattern => "^\["
            negate => true
            what => "previous"
        }
    }
}

将当前的数据添加到下一行后面, 知道新匹配 ^[ 位置

3, filter插件: 

扩展了进入过滤器的原始数据,进行复杂的逻辑处理,甚至可以无中生有的添加新的 logstash 事件到后续的流程中去!

1) 时间处理

logstash内部使用了java的 joda 时间库来处理时间 

filter {
    grok {
        match => ["message", "%{HTTPDATE:logdate}"]
    }
    date {
        match => ["logdate", "dd/MMM/yyyy:HH:mm:ss Z"]
    }
}

 

2) grok, 正则捕获

 可以将输入的文本匹配到字段中去: 

input {stdin{}}
filter {
    grok {
        match => {
            "message" => "\s+(?<request_time>\d+(?:\.\d+)?)\s+"
        }
    }
}
output {stdout{codec => rubydebug}}

然后输入  begin 123.456 end

 

 

grok支持预定义的grok表达式: (自己的变量)

%{PATTERN_NAME:capture_name:data_type}

 所以上例可改成: 

filter {
    grok {
        match => {
            "message" => "%{WORD} %{NUMBER:request_time:float} %{WORD}"
        }
    }
}

重新运行后, request_time的值变为float类型的, 

实际使用中: 建议把所有的fork表达式统一写在一个地方, 然后patterns_dir指明. 如果将message中的所有信息都grok到不通字段了, 数据就存储重复了, 因此可以用remove_filed或者 overwrite来重写message

filter {
    grok {
        patterns_dir => ["/path/to/your/own/patterns"]
        match => {
            "message" => "%{SYSLOGBASE} %{DATA:message}"
        }
     data => {
        "match" => ["date1", "YYYY-MM-dd HH:mm:ss.SSS" ]
     } overwrite
=> ["message"] } }

冒号(:) 可以重新命名

附: grok 正则变量类型: https://github.com/wenbronk/elasticsearch-elasticsearch-learn/blob/master/grok%E5%86%85%E9%83%A8%E5%8F%98%E9%87%8F.txt

 

3) dissect

跟grok类似, 但资源消耗较小. 当日志格式有比较简明的分隔标志位,而且重复性较大的时候,我们可以使用 dissect 插件更快的完成解析工作 

filter {
    dissect {
        mapping => {
            "message" => "%{ts} %{+ts} %{+ts} %{src} %{} %{prog}[%{pid}]: %{msg}"
        }
        convert_datatype => {
            pid => "int"
        }
    }
}

比如配置: http://rizhiyi.com/index.do?id=123

http://%{domain}/%{?url}?%{?arg1}=%{&arg1}

匹配后
{
  domain => "rizhiyi.com",
  id => "123"
}

解释

%{+key} 这个 + 表示,前面已经捕获到一个 key 字段了,而这次捕获的内容,自动添补到之前 key 字段内容的后面。
%{+key/2} 这个 /2 表示,在有多次捕获内容都填到 key 字段里的时候,拼接字符串的顺序谁前谁后。/2 表示排第 2 位。
%{?string} 这个 ? 表示,这块只是一个占位,并不会实际生成捕获字段存到 Event 里面。
%{?string} %{&string} 当同样捕获名称都是 string,但是一个 ? 一个 & 的时候,表示这是一个键值对。

4) geoip: 免费的ip地址归类查询库, 可根据ip提供对应的低于信息, 包括省,市,经纬度,, 可视化地图统计等

input {stdin{}}
filter {
    geoip {
        source => "message"
    }
}
output {stdout{codec => rubydebug}}

运行结果

如果只想要其中某些字段, 可以通过fileds来指定

 

geoip {
        fields => ["city_name", "continent_code", "country_code2", "country_code3", "country_name", "dma_code", "ip", "latitude", "longitude", "postal_code", "region_name", "timezone"]
    }

5, metrics, filters/metrics 插件是使用 Ruby 的 Metriks 模块来实现在内存里实时的计数和采样分析

 最近一分钟 504 请求的个数超过 100 个就报警:


filter {
    metrics {
        timer => {"rt" => "%{request_time}"}
        percentiles => [25, 75]
        add_tag => "percentile"
    }
    if "percentile" in [tags] {
        ruby {
            code => "l=event.get('[rt][p75]')-event.get('[rt][p25]');event.set('[rt][low]', event.get('[rt][p25]')-l);event.set('[rt][high]',event.get('[rt][p75]')+l)"
        }
    }
}
output {
    if "percentile" in [tags] and ([rt][last] > [rt][high] or [rt][last] < [rt][low]) {
        exec {
            command => "echo \"Anomaly: %{[rt][last]}\""
        }
    }
}
 

 

 6, mutate, 类型转换

可转换的类型包括  integer, float, string

filter {
    mutate {
        convert => ["request_time", "float"]
    }
}

字符串处理: , sub

gsub => ["urlparams", "[\\?#]", "_"]

split: 

filter {
    mutate {
        split => ["message", "|"]
    }
}

join, 将split切分的在join回去

filter {
    mutate {
        split => ["message", "|"]
    }
    mutate {
        join => ["message", ","]
    }
}

rename: 字段重命名:

filter {
    mutate {
        rename => ["syslog_host", "host"]
    }
}

7, split切分

是multiline插件的反向, 将一行数据切分到多个事件中去

filter {
    split {
        field => "message"
        terminator => "#"
    }
}

然后输入  "test1#test2", 可以看到被输出到2个事件中

4, output

 1, 标准输出 (Stdout)

output {
    stdout {
        codec => rubydebug
        workers => 2
    }
}

2, 输出到es

output {
    elasticsearch {
        hosts => ["192.168.0.2:9200"] # 有多个用逗号隔开
        index => "logstash-%{type}-%{+YYYY.MM.dd}"
        document_type => "%{type}"
        flush_size => 20000
        idle_flush_time => 10
        sniffing => true
        template_overwrite => true
    }
}

 

注意索引名中不能有大写字母,否则 ES 在日志中会报 InvalidIndexNameException,但是 Logstash 不会报错,这个错误比较隐晦,也容易掉进这个坑中。

 3), email

126邮箱发送到 qq邮箱的示例

output {
    email {
        port           =>    "25"
        address        =>    "smtp.126.com"
        username       =>    "test@126.com"
        password       =>    ""
        authentication =>    "plain"
        use_tls        =>    true
        from           =>    "test@126.com"
        subject        =>    "Warning: %{title}"
        to             =>    "test@qq.com"
        via            =>    "smtp"
        body           =>    "%{message}"
    }
}

 

elasticsearch数据库同步插件logstash

   1.下载和elasticsearch相同版本的logstash.  2.进行解压后,进入bin下,新建一个文件mysql.conf,并输入inputstdinoutputstdout3.cmd进入bin下,输入logstash-fmysql.conf 启动后,输入  http://127.0.0.1:9600/4.ok;说明已经启动了   查看详情

logstash中多行合并

这里我之前是在input里面配置的多行合并,合并语法为:input{beats{type=>beatsport=>7001codec=>multiline{patterns_dir=>["/data/package/logstash/vendor/bundle/jruby/1.9/gems/logstash-patterns-core-2.0.2/patterns"]pat 查看详情

logstash同步mysql数据到elasticsearch(代码片段)

目录1MySql数据到Elasticsearch1.1下载logstash1.2解压logstash1.3在logstash目录创建mysql文件夹1.4将mysql驱动文件和数据库查询文件放进mysql中1.5在config目录下创建mysqltoes.conf文件1.6mysqltoes.conf配置1.7启动logstash2配置语法讲解3启动方式4filebeat基... 查看详情

mysql数据同步到elasticsearch

阅读目录说明1同步原理2插件logstash-input-jdbcgo-mysql-elasticsearchelasticsearch-jdbc3logstash-input-jdbc实现同步安装Logstash6.2.3说明要通过elasticsearch实现数据检索,首先要将mysql中的数据导入elasticsearch,并实现数据源与elasticsearch数据同步,这... 查看详情

剑指架构师系列-logstash分布式系统的日志监控

 Logstash主要做由三部署组成:Collect:数据输入Enrich:数据加工,如过滤,改写等Transport:数据输出下面来安装一下:wgethttps://download.elastic.co/logstash/logstash/logstash-2.3.2.tar.gztar-zxvflogstash-2.3.2.tar.gz  在logstash-2.3.2目录下创建文 查看详情

logstash2.2以上版本,nginx错误日志切割

  网上nginx错误日期切分的版本各式各样,能用的没几个,踩过很多坑,特意记录下:if[type]=="xx_app_nginx_error_log"{grok{patterns_dir=>"/etc/logstash/conf.d/patterns"match=>{"message"=>"%{NGINXERROR_1}"}}mutate{#避免日期报错gs 查看详情

elk安装

wgethttps://download.elastic.co/elasticsearch/release/org/elasticsearch/distribution/tar/elasticsearch/2.4.0/elasticsearch-2.4.0.tar.gzwgethttps://download.elastic.co/logstash/logstash/logstash-2.4.0. 查看详情

日志分析第五章安装logstash

logstash是java应用,依赖JDK,首先需要安装JDK,在安装jdk过程中,logstash-2.3.4使用JDK-1.7版本有bug,使用JDK-1.8版本正常,因此我们安装JDK-1.8版本。安装JDK官网地址:http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html#rpm- 查看详情

为啥logstash进程的cpu使用率100

...mpOnOutOfMemoryError-Xmx128m-Xss2048k-Djffi.boot.library.path=/opt/dtstack/logstash-2.1.1/vendor/jruby/lib/jni-XX:+UseParNewGC-XX:+UseConcMarkSweepGC-Djava.awt.headless=true-XX:CMSInitiatingOccupancyFraction=75-XX:+UseCMSInitiatingOccupancyOnly-XX:+HeapDumpOnOutOfMemoryError-XX:HeapDumpPath=/opt/dts... 查看详情

logstash使用fileset.name重命名字段(代码片段)

我正在使用Logstash6.2.1,它从Filebeat收集syslogs。我收到两个字段(如Kibana所示),fileset.name和fileset.module分别带有值syslog和system。我希望使用以下过滤器在Logstash中重命名这些字段filtermutaterename=>"fileset.module"=>"category"但字段名... 查看详情

一个logstash引发的连环案,关于logstash提示:reachedopenfileslimit:4095,setbythe'max_open_files'op(代码片段)

不多说,直接上问题。版本logstash-2.4.0,启动后提示错误:!!!Pleaseupgradeyourjavaversion,thecurrentversion‘1.7.0_45-mockbuild_2013_10_22_03_37-b00‘maycauseproblems.Werecommendaminimumversionof1.7.0_51:timestamp=>"2018- 查看详情

基于docker的elk高可用集群架构(代码片段)

目录一、规划1.1主机规划1.2整体架构二、部署2.1ES集群2.2Logstash分流2.3Kibana前端展示2.4Nginx反向代理2.5Zookeeper集群2.6Kafka集群2.7Filebeat轻量级数据收集引擎2.7.1架构图2.7.2部署及应用三、总结FAQ一、规划1.1主机规划ServiceVersion角色192.1... 查看详情

androidgradle插件自定义gradle插件模块②(在模块中定义插件|引入自定义gradle插件模块|配置gradle插件上传选项|配置分组名称版本号)(代码片段)

文章目录一、在JavaorKotlinLibrary模块中定义插件二、引入自定义Gradle插件模块三、配置自定义Gradle插件上传选项四、配置Group分组、插件名称、插件版本号五、自定义Gradle插件的完整build.gradle构建脚本AndroidPluginDSLReference参考文档:A... 查看详情

androidgradle插件gradle依赖管理①(org.gradle.api.project配置|androidgradle插件配置与gradle配置关联)★(代码片段)

文章目录一、org.gradle.api.Project配置二、AndroidGradle插件配置与Gradle配置关联AndroidPluginDSLReference参考文档:AndroidGradle插件配置与Gradle配置关联:【AndroidGradle插件】Gradle依赖管理①(org.gradle.api.Project配置|AndroidGradle插件配置与Gradle配置... 查看详情

spacemacs配置yasnippe插件(代码片段)

Spacemacs配置yasnippe插件简介spacemacs也可以像vim一样配置快捷代码块,提高输入的效率。当前比较好用的插件是yasnippet。安装yasnippet插件修改.spacemacs配置文件dotspacemacs-additional-packages‘(;;快捷代码块插件yasnippet;;常用代码块yasnippet-s... 查看详情

vscode配置svn插件

vscode配置svn插件vscode配置svn(已更新新版本vs) 查看详情

跨不同的maven插件共享配置(代码片段)

我有两个Maven插件,A和B。它们是独立开发的,但它们恰好需要相同的配置选项。大多数潜在用户将安装插件A,插件B是它的补充。我正在尝试寻找一种在插件之间共享配置的方法。我知道您可以将配置选项提升为属性,然后为每... 查看详情

elk收集网络设备日志(代码片段)

ELK版本:elasticsearch-5.2.2kibana-5.2.2logstash-2.4.1allplugins部署环境为单台服务器,未配置集群一、官网下载地址:https://www.elastic.co/downloads二、elasticsearch安装1、由于安装ELK需要jdk,因此没有jdk的,先安装1.8版本以上[root@ELK-1~]#rpm-ivhjdk-8u... 查看详情