logstash配置文件详解(代码片段)

wzxmt wzxmt     2022-12-20     723

关键词:

 logstash pipeline 包含两个必须的元素:input和output,和一个可选元素:filter。

 从input读取事件源,(经过filter解析和处理之后),从output输出到目标存储库(elasticsearch或其他)。

技术图片

  在生产环境使用logstash,一般使用都将配置写入文件里面,然后启动logstash。

  处理nginx日志

# vim nginx_access.conf
input
    file
        path => "/var/log/nginx/access.log"
        start_position => "beginning"
        type => "nginx_access_log"
    

filter
    grok
        match => "message" => "%IPORHOST:clientip %USER:ident %USER:auth \\[%HTTPDATE:timestamp\\] \\"%WORD:verb %DATA:request HTTP/%NUMBER:httpversion\\" %NUMBER:response:int (?:-|%NUMBER:bytes:int) \\"(?:-|%DATA:referrer)\\" \\"%DATA:user_agent\\" (?:%IP:proxy|-) %DATA:upstream_addr %NUMBER:upstream_request_time:float %NUMBER:upstream_response_time:float"
        match => "message" => "%IPORHOST:clientip %USER:ident %USER:auth \\[%HTTPDATE:timestamp\\] \\"%WORD:verb %DATA:request HTTP/%NUMBER:httpversion\\" %NUMBER:response:int (?:-|%NUMBER:bytes:int) \\"%DATA:referrer\\" \\"%DATA:user_agent\\" \\"%DATA:proxy\\""
    
    if [request] 
        urldecode 
            field => "request"
        
       ruby 
           init => "@kname = [‘url_path‘,‘url_arg‘]"
           code => "
               new_event = LogStash::Event.new(Hash[@kname.zip(event.get(‘request‘).split(‘?‘))])
               event.append(new_event)"
       
        if [url_arg] 
            ruby 
               init => "@kname = [‘key‘, ‘value‘]"
               code => "event.set(‘url_args‘, event.get(‘url_arg‘).split(‘&‘).collect |i| Hash[@kname.zip(i.split(‘=‘))])"
                
        
    
    geoip
        source => "clientip"
    
    useragent
        source => "user_agent"
        target => "ua"
        remove_field => "user_agent"
    
    date 
        match => ["timestamp","dd/MMM/YYYY:HH:mm:ss Z"]
        locale => "en"
    
    mutate
        remove_field => ["message","timestamp","request","url_arg"]
    

output
    elasticsearch       
        hosts => "localhost:9200"
        index => "nginx-access-log-%+YYYY.MM.dd"   
    
#  stdout
#     codec => rubydebug
#  

 

如果是想测试配置文件写的是否正确,用下面这个方式启动测试一下

/usr/share/logstash/bin/logstash -t -f /etc/logstash/conf.d/nginx.conf  #测试配置文件
Configuration OK
/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/nginx_access.conf  #启动logstash

启动logstash

# systemctl start logstash

 input plugin  让logstash可以读取特定的事件源。

 官网:https://www.elastic.co/guide/en/logstash/current/input-plugins.html

 事件源可以是从stdin屏幕输入读取,可以从file指定的文件,也可以从es,filebeat,kafka,redis等读取

  • stdin 标准输入
  • file   从文件读取数据
    file
        path => [‘/var/log/nginx/access.log‘]  #要输入的文件路径
        type => ‘nginx_access_log‘
        start_position => "beginning"
    
    # path  可以用/var/log/*.log,/var/log/**/*.log,如果是/var/log则是/var/log/*.log
    # type 通用选项. 用于激活过滤器
    # start_position 选择logstash开始读取文件的位置,begining或者end。
    还有一些常用的例如:discover_interval,exclude,sincedb_path,sincedb_write_interval等可以参考官网
  • syslog  通过网络将系统日志消息读取为事件
    syslog
        port =>"514" 
        type => "syslog"
    
    # port 指定监听端口(同时建立TCP/UDP的514端口的监听)
    
    #从syslogs读取需要实现配置rsyslog:
    # cat /etc/rsyslog.conf   加入一行
    *.* @172.17.128.200:514   #指定日志输入到这个端口,然后logstash监听这个端口,如果有新日志输入则读取
    # service rsyslog restart   #重启日志服务
     
  • beats   从Elastic beats接收事件
    beats 
        port => 5044   #要监听的端口
    
    # 还有host等选项
    
    # 从beat读取需要先配置beat端,从beat输出到logstash。
    # vim /etc/filebeat/filebeat.yml 
    ..........
    output.logstash:
    hosts: ["localhost:5044"]
  • kafka  将 kafka topic 中的数据读取为事件
    kafka
        bootstrap_servers=> "kafka01:9092,kafka02:9092,kafka03:9092"
        topics => ["access_log"]
        group_id => "logstash-file"
        codec => "json"
    
    kafka
        bootstrap_servers=> "kafka01:9092,kafka02:9092,kafka03:9092"
        topics => ["weixin_log","user_log"]  
        codec => "json"
    
    # bootstrap_servers 用于建立群集初始连接的Kafka实例的URL列表。
    # topics 要订阅的主题列表,kafka topics
    # group_id 消费者所属组的标识符,默认为logstash。kafka中一个主题的消息将通过相同的方式分发到Logstash的group_id # codec 通用选项,用于输入数据的编解码器。

   还有很多的input插件类型,可以参考官方文档来配置。

filter plugin 过滤器插件,对事件执行中间处理

  • grok   解析文本并构造 。把非结构化日志数据通过正则解析成结构化和可查询化  
    grok 
                match => "message"=>"^%IPORHOST:clientip %USER:ident %USER:auth \\[%HTTPDATE:timestamp\\] "%WORD:verb %DATA:request HTTP/%NUMBER:httpversion" %NUMBER:response:int (?:-|%NUMBER:bytes:int) %QS:referrer %QS:agent$"
            
    匹配nginx日志
    # 203.202.254.16 - - [22/Jun/2018:16:12:54 +0800] "GET / HTTP/1.1" 200 3700 "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/601.7.7 (KHTML, like Gecko) Version/9.1.2 Safari/601.7.7"
    #220.181.18.96 - - [13/Jun/2015:21:14:28 +0000] "GET /blog/geekery/xvfb-firefox.html HTTP/1.1" 200 10975 "-" "Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)"
    技术图片
        grok 
                match => "message"=>"^%IPORHOST:clientip %USER:ident %USER:auth \\[%HTTPDATE:timestamp\\] "%WORD:verb %DATA:request HTTP/%NUMBER:httpversion" %NUMBER:response:int (?:-|%NUMBER:bytes:int) %QS:referrer %QS:agent$"
            
    匹配nginx日志
    # 203.202.254.16 - - [22/Jun/2018:16:12:54 +0800] "GET / HTTP/1.1" 200 3700 "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/601.7.7 (KHTML, like Gecko) Version/9.1.2 Safari/601.7.7"
    #220.181.18.96 - - [13/Jun/2015:21:14:28 +0000] "GET /blog/geekery/xvfb-firefox.html HTTP/1.1" 200 10975 "-" "Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)"
    技术图片
  •  注意这里grok 可以有多个match匹配规则,如果前面的匹配失败可以使用后面的继续匹配。例如
     grok 
                match => ["message", "%IP:clientip - %USER:user \\[%HTTPDATE:raw_datetime\\] \\"(?:%WORD:verb %URIPATHPARAM:request HTTP/%NUMBER:httpversion)\\" (?:\\"%DATA:body\\" )?(?:\\"%DATA:cookie\\" )?%NUMBER:response (?:%NUMBER:bytes:int|-) \\"%DATA:referrer\\" \\"%DATA:agent\\" (?:(%IP:proxy,? ?)*|-|unknown) (?:%DATA:upstream_addr |)%NUMBER:request_time:float (?:%NUMBER:upstream_time:float|-)"]
                match => ["message", "%IP:clientip - %USER:user \\[%HTTPDATE:raw_datetime\\] \\"(?:%WORD:verb %URI:request HTTP/%NUMBER:httpversion)\\" (?:\\"%DATA:body\\" )?(?:\\"%DATA:cookie\\" )?%NUMBER:response (?:%NUMBER:bytes:int|-) \\"%DATA:referrer\\" \\"%DATA:agent\\" (?:(%IP:proxy,? ?)*|-|unknown) (?:%DATA:upstream_addr |)%NUMBER:request_time:float (?:%NUMBER:upstream_time:float|-)"]       
            
    技术图片
            grok 
                match => ["message", "%IP:clientip - %USER:user \\[%HTTPDATE:raw_datetime\\] \\"(?:%WORD:verb %URIPATHPARAM:request HTTP/%NUMBER:httpversion)\\" (?:\\"%DATA:body\\" )?(?:\\"%DATA:cookie\\" )?%NUMBER:response (?:%NUMBER:bytes:int|-) \\"%DATA:referrer\\" \\"%DATA:agent\\" (?:(%IP:proxy,? ?)*|-|unknown) (?:%DATA:upstream_addr |)%NUMBER:request_time:float (?:%NUMBER:upstream_time:float|-)"]
                match => ["message", "%IP:clientip - %USER:user \\[%HTTPDATE:raw_datetime\\] \\"(?:%WORD:verb %URI:request HTTP/%NUMBER:httpversion)\\" (?:\\"%DATA:body\\" )?(?:\\"%DATA:cookie\\" )?%NUMBER:response (?:%NUMBER:bytes:int|-) \\"%DATA:referrer\\" \\"%DATA:agent\\" (?:(%IP:proxy,? ?)*|-|unknown) (?:%DATA:upstream_addr |)%NUMBER:request_time:float (?:%NUMBER:upstream_time:float|-)"]       
            
    技术图片

      grok 语法:%SYNTAX:SEMANTIC   即 %正则:自定义字段名

                     官方提供了很多正则的grok pattern可以直接使用  :https://github.com/logstash-plugins/logstash-patterns-core/blob/master/patterns  

                     grok debug工具: http://grokdebug.herokuapp.com

      正则表达式调试工具: https://www.debuggex.com/

    需要用到较多的正则知识,参考文档有:https://www.jb51.net/tools/zhengze.html

         自定义模式:   (?<字段名>the pattern)

        例如: 匹配 2018/06/27 14:00:54  

                (?<datetime>\\d\\d\\d\\d\\/\\d\\d\\/\\d\\d \\d\\d:\\d\\d:\\d\\d)

          得到结果:  "datetime": "2018/06/27 14:00:54"

 

  • date   日期解析  解析字段中的日期,然后转存到@timestamp
    [2018-07-04 17:43:35,503]
    grok
          match => "message"=>"%DATA:raw_datetime"
    
    date
           match => ["raw_datetime","YYYY-MM-dd HH:mm:ss,SSS"]
            remove_field =>["raw_datetime"]
    
    
    #将raw_datetime存到@timestamp 然后删除raw_datetime
    
    #24/Jul/2018:18:15:05 +0800
    date 
          match => ["timestamp","dd/MMM/YYYY:HH:mm:ss Z]
    
    技术图片
    [2018-07-04 17:43:35,503]
    grok
          match => "message"=>"%DATA:raw_datetime"
    
    date
           match => ["raw_datetime","YYYY-MM-dd HH:mm:ss,SSS"]
            remove_field =>["raw_datetime"]
    
    
    #将raw_datetime存到@timestamp 然后删除raw_datetime
    
    #24/Jul/2018:18:15:05 +0800
    date 
          match => ["timestamp","dd/MMM/YYYY:HH:mm:ss Z]
    
    技术图片
  • mutate  对字段做处理 重命名、删除、替换和修改字段。
    • covert 类型转换。类型包括:integer,float,integer_eu,float_eu,string和boolean
      filter
          mutate
      #     covert => ["response","integer","bytes","float"]  #数组的类型转换
              convert => "message"=>"integer"
          
      
      #测试------->
      
                "host" => "localhost",
             "message" => 123,    #没带“”,int类型
          "@timestamp" => 2018-06-26T02:51:08.651Z,
            "@version" => "1"
      

      技术图片
      filter
          mutate
      #     covert => ["response","integer","bytes","float"]  #数组的类型转换
              convert => "message"=>"integer"
          
      
      #测试------->
      
                "host" => "localhost",
             "message" => 123,    #没带“”,int类型
          "@timestamp" => 2018-06-26T02:51:08.651Z,
            "@version" => "1"
      
      技术图片
    • split   使用分隔符把字符串分割成数组
      mutate
          split => "message"=>","
      
      #---------->
      aaa,bbb
      
          "@timestamp" => 2018-06-26T02:40:19.678Z,
            "@version" => "1",
                "host" => "localhost",
             "message" => [
              [0] "aaa",
              [1] "bbb"
          ]
      192,128,1,100
      
              "host" => "localhost",
           "message" => [
            [0] "192",
            [1] "128",
            [2] "1",
            [3] "100"
       ],
        "@timestamp" => 2018-06-26T02:45:17.877Z,
          "@version" => "1"
      
      技术图片
      mutate
          split => "message"=>","
      
      #---------->
      aaa,bbb
      
          "@timestamp" => 2018-06-26T02:40:19.678Z,
            "@version" => "1",
                "host" => "localhost",
             "message" => [
              [0] "aaa",
              [1] "bbb"
          ]
      192,128,1,100
      
              "host" => "localhost",
           "message" => [
            [0] "192",
            [1] "128",
            [2] "1",
            [3] "100"
       ],
        "@timestamp" => 2018-06-26T02:45:17.877Z,
          "@version" => "1"
      
      技术图片
    • merge  合并字段  。数组和字符串 ,字符串和字符串
      filter
          mutate
              add_field => "field1"=>"value1"
          
          mutate 
                split => "message"=>"."   #把message字段按照.分割
          
          mutate
              merge => "message"=>"field1"   #将filed1字段加入到message字段
          
      
      #--------------->
      abc
      
             "message" => [
              [0] "abc,"
              [1] "value1"
          ],
          "@timestamp" => 2018-06-26T03:38:57.114Z,
              "field1" => "value1",
            "@version" => "1",
                "host" => "localhost"
      
      
      abc,.123
      
             "message" => [
              [0] "abc,",
              [1] "123",
              [2] "value1"
          ],
          "@timestamp" => 2018-06-26T03:38:57.114Z,
              "field1" => "value1",
            "@version" => "1",
                "host" => "localhost"
      
      技术图片
      filter
          mutate
              add_field => "field1"=>"value1"
          
          mutate 
                split => "message"=>"."   #把message字段按照.分割
          
          mutate
              merge => "message"=>"field1"   #将filed1字段加入到message字段
          
      
      #--------------->
      abc
      
             "message" => [
              [0] "abc,"
              [1] "value1"
          ],
          "@timestamp" => 2018-06-26T03:38:57.114Z,
              "field1" => "value1",
            "@version" => "1",
                "host" => "localhost"
      
      
      abc,.123
      
             "message" => [
              [0] "abc,",
              [1] "123",
              [2] "value1"
          ],
          "@timestamp" => 2018-06-26T03:38:57.114Z,
              "field1" => "value1",
            "@version" => "1",
                "host" => "localhost"
      
      技术图片
    • rename   对字段重命名
      filter
          mutate
              rename => "message"=>"info"
          
      
      #-------->
      123
      
          "@timestamp" => 2018-06-26T02:56:00.189Z,
                "info" => "123",
            "@version" => "1",
                "host" => "localhost"
      
      技术图片
      filter
          mutate
              rename => "message"=>"info"
          
      
      #-------->
      123
      
          "@timestamp" => 2018-06-26T02:56:00.189Z,
                "info" => "123",
            "@version" => "1",
                "host" => "localhost"
      
      技术图片
    • remove_field    移除字段
      mutate 
          remove_field => ["message","datetime"]
      
      mutate 
          remove_field => ["message","datetime"]
      
    • join  用分隔符连接数组,如果不是数组则不做处理
      mutate
              split => "message"=>":"
      
      mutate
              join => "message"=>","
      
      ------>
      abc:123
      
          "@timestamp" => 2018-06-26T03:55:41.426Z,
             "message" => "abc,123",
                "host" => "localhost",
            "@version" => "1"
      
      aa:cc
      
          "@timestamp" => 2018-06-26T03:55:47.501Z,
             "message" => "aa,cc",
                "host" => "localhost",
            "@version" => "1"
      
      技术图片
      mutate
              split => "message"=>":"
      
      mutate
              join => "message"=>","
      
      ------>
      abc:123
      
          "@timestamp" => 2018-06-26T03:55:41.426Z,
             "message" => "abc,123",
                "host" => "localhost",
            "@version" => "1"
      
      aa:cc
      
          "@timestamp" => 2018-06-26T03:55:47.501Z,
             "message" => "aa,cc",
                "host" => "localhost",
            "@version" => "1"
      
      技术图片
    • gsub  用正则或者字符串替换字段值。仅对字符串有效 
      mutate
              gsub => ["message","/","_"]   #用_替换/
          
      
      ------>
      a/b/c/
      
            "@version" => "1",
             "message" => "a_b_c_",
                "host" => "localhost",
          "@timestamp" => 2018-06-26T06:20:10.811Z
      
      技术图片
          mutate
              gsub => ["message","/","_"]   #用_替换/
          
      
      ------>
      a/b/c/
      
            "@version" => "1",
             "message" => "a_b_c_",
                "host" => "localhost",
          "@timestamp" => 2018-06-26T06:20:10.811Z
      
      技术图片
    • update  更新字段。如果字段不存在,则不做处理
      mutate
              add_field => "field1"=>"value1"
          
          mutate
              update => "field1"=>"v1"
              update => "field2"=>"v2"    #field2不存在 不做处理
          
      ---------------->
      
          "@timestamp" => 2018-06-26T06:26:28.870Z,
              "field1" => "v1",
                "host" => "localhost",
            "@version" => "1",
             "message" => "a"
      

      技术图片
          mutate
              add_field => "field1"=>"value1"
          
          mutate
              update => "field1"=>"v1"
              update => "field2"=>"v2"    #field2不存在 不做处理
          
      ---------------->
      
          "@timestamp" => 2018-06-26T06:26:28.870Z,
              "field1" => "v1",
                "host" => "localhost",
            "@version" => "1",
             "message" => "a"
      
      技术图片
    • replace 更新字段。如果字段不存在,则创建
       mutate
              add_field => "field1"=>"value1"
          
          mutate
              replace => "field1"=>"v1"
              replace => "field2"=>"v2"
          
      ---------------------->
      
             "message" => "1",
                "host" => "localhost",
          "@timestamp" => 2018-06-26T06:28:09.915Z,
              "field2" => "v2",        #field2不存在,则新建
            "@version" => "1",
              "field1" => "v1"
      
      技术图片
          mutate
              add_field => "field1"=>"value1"
          
          mutate
              replace => "field1"=>"v1"
              replace => "field2"=>"v2"
          
      ---------------------->
      
             "message" => "1",
                "host" => "localhost",
          "@timestamp" => 2018-06-26T06:28:09.915Z,
              "field2" => "v2",        #field2不存在,则新建
            "@version" => "1",
              "field1" => "v1"
      
      技术图片
  • geoip  根据来自Maxmind GeoLite2数据库的数据添加有关IP地址的地理位置的信息
     geoip 
                source => "clientip"
                database =>"/tmp/GeoLiteCity.dat"
            
            geoip 
                source => "clientip"
                database =>"/tmp/GeoLiteCity.dat"
            
  • ruby    ruby插件可以执行任意Ruby代码
    filter
        urldecode
            field => "message"
        
        ruby 
            init => "@kname = [‘url_path‘,‘url_arg‘]"
            code => " 
                new_event = LogStash::Event.new(Hash[@kname.zip(event.get(‘message‘).split(‘?‘))]) 
                event.append(new_event)"
        
        if [url_arg]
            kv
                source => "url_arg"
                field_split => "&"
                target => "url_args"
                remove_field => ["url_arg","message"]
            
        
    
    # ruby插件
    # 以?为分隔符,将request字段分成url_path和url_arg
    -------------------->
    www.test.com?test
    
           "url_arg" => "test",
              "host" => "localhost",
          "url_path" => "www.test.com",
           "message" => "www.test.com?test",  
          "@version" => "1",
        "@timestamp" =>  2018-06-26T07:31:04.887Z
    
    www.test.com?title=elk&content=学习elk
    
          "url_args" => 
              "title" => "elk",
            "content" => "学习elk"
        ,
              "host" => "localhost",
          "url_path" => "www.test.com",
          "@version" => "1",
        "@timestamp" =>  2018-06-26T07:33:54.507Z
    
    技术图片
    filter
        urldecode
            field => "message"
        
        ruby 
            init => "@kname = [‘url_path‘,‘url_arg‘]"
            code => " 
                new_event = LogStash::Event.new(Hash[@kname.zip(event.get(‘message‘).split(‘?‘))]) 
                event.append(new_event)"
        
        if [url_arg]
            kv
                source => "url_arg"
                field_split => "&"
                target => "url_args"
                remove_field => ["url_arg","message"]
            
        
    
    # ruby插件
    # 以?为分隔符,将request字段分成url_path和url_arg
    -------------------->
    www.test.com?test
    
           "url_arg" => "test",
              "host" => "localhost",
          "url_path" => "www.test.com",
           "message" => "www.test.com?test",  
          "@version" => "1",
        "@timestamp" =>  2018-06-26T07:31:04.887Z
    
    www.test.com?title=elk&content=学习elk
    
          "url_args" => 
              "title" => "elk",
            "content" => "学习elk"
        ,
              "host" => "localhost",
          "url_path" => "www.test.com",
          "@version" => "1",
        "@timestamp" =>  2018-06-26T07:33:54.507Z
    
    技术图片
  • urldecode    用于解码被编码的字段,可以解决URL中 中文乱码的问题
     urldecode
            field => "message"
        
    
    # field :指定urldecode过滤器要转码的字段,默认值是"message"
    # charset(缺省): 指定过滤器使用的编码.默认UTF-8
        urldecode
            field => "message"
        
    
    # field :指定urldecode过滤器要转码的字段,默认值是"message"
    # charset(缺省): 指定过滤器使用的编码.默认UTF-8
  • kv   通过指定分隔符将字符串分割成key/value
    kv
            prefix => "url_"   #给分割后的key加前缀
            target => "url_ags"    #将分割后的key-value放入指定字段
            source => "message"   #要分割的字段
            field_split => "&"    #指定分隔符
            remove_field => "message"
        
    -------------------------->
    a=1&b=2&c=3
    
                "host" => "localhost",
           "url_ags" => 
              "url_c" => "3",
              "url_a" => "1",
              "url_b" => "2"
        ,
          "@version" => "1",
        "@timestamp" => 2018-06-26T07:07:24.557Z

    技术图片
    kv
            prefix => "url_"   #给分割后的key加前缀
            target => "url_ags"    #将分割后的key-value放入指定字段
            source => "message"   #要分割的字段
            field_split => "&"    #指定分隔符
            remove_field => "message"
        
    -------------------------->
    a=1&b=2&c=3
    
                "host" => "localhost",
           "url_ags" => 
              "url_c" => "3",
              "url_a" => "1",
              "url_b" => "2"
        ,
          "@version" => "1",
        "@timestamp" => 2018-06-26T07:07:24.557Z
    技术图片
  • useragent 添加有关用户代理(如系列,操作系统,版本和设备)的信息
    if [agent] != "-" 
      useragent 
        source => "agent"
        target => "ua"
        remove_field => "agent"
      
    
    # if语句,只有在agent字段不为空时才会使用该插件
    #source 为必填设置,目标字段
    #target 将useragent信息配置到ua字段中。如果不指定将存储在根目录中

    技术图片
    if [agent] != "-" 
      useragent 
        source => "agent"
        target => "ua"
        remove_field => "agent"
      
    
    # if语句,只有在agent字段不为空时才会使用该插件
    #source 为必填设置,目标字段
    #target 将useragent信息配置到ua字段中。如果不指定将存储在根目录中
    技术图片

logstash 比较运算符

  等于:   ==, !=, <, >, <=, >=
  正则:   =~, !~ (checks a pattern on the right against a string value on the left)
  包含关系:  in, not in

  支持的布尔运算符:and, or, nand, xor

  支持的一元运算符: !

output plugin  输出插件,将事件发送到特定目标。

  • stdout  标准输出。将事件输出到屏幕上
    output
        stdout
            codec => "rubydebug"
        
    
  • file   将事件写入文件
        file 
           path => "/data/logstash/%host/application
           codec => line  format => "%message" 
        
  • kafka  将事件发送到kafka
        kafka
            bootstrap_servers => "localhost:9092"
            topic_id => "test_topic"  #必需的设置。生成消息的主题
        
  • elasticseach  在es中存储日志
        elasticsearch 
            hosts => "localhost:9200"
            index => "nginx-access-log-%+YYYY.MM.dd"  
        
    #index 事件写入的索引。可以按照日志来创建索引,以便于删旧数据和按时间来搜索日志

 补充一个codec plugin 编解码器插件

  codec 本质上是流过滤器,可以作为input 或output 插件的一部分运行。例如上面output的stdout插件里有用到。

  • multiline codec plugin  多行合并, 处理堆栈日志或者其他带有换行符日志需要用到
    input 
      stdin 
        codec => multiline 
          pattern => "pattern, a regexp"    #正则匹配规则,匹配到的内容按照下面两个参数处理
          negate => "true" or "false"     # 默认为false。处理匹配符合正则规则的行。如果为true,处理不匹配符合正则规则的行。
          what => "previous" or "next"    #指定上下文。将指定的行是合并到上一行或者下一行。
        
      
    
    codec => multiline 
        pattern => "^\\s"  
        what => "previous"  
    
    # 以空格开头的行都合并到上一行
    
    codec => multiline 
        # Grok pattern names are valid! :)
        pattern => "^%TIMESTAMP_ISO8601 "
        negate => true
        what => "previous"
    
    # 任何不以这个时间戳格式开头的行都与上一行合并
    
    codec => multiline 
       pattern => "\\\\$"
       what => "next"
    
    # 以反斜杠结尾的行都与下一行合并

logstash数据处理服务的输出插件output配置参数详解(代码片段)

Logstash数据处理服务的输出插件Output配置参数详解1.将日志数据存储到Elasticsearch集群output配置字段是将收集的日志数据存输出到生存储中,一般都是elasticsearch集群。常用字段配置:hostsES集群每个节点的地址信息。index:指定存储... 查看详情

elk之logstash配置文件详解(代码片段)

  Logstash是一个开源的数据收集引擎,它具有备实时数据传输能力。它可以统一过滤来自不同源的数据,并按照开发者的制定的规范输出到目的地。它以插件的形式来组织功能,通过配置文件来描述需要插件做什么,配置文件... 查看详情

logstash配置文件(代码片段)

 仅罗列了一些常用基础配置,更多配置请到官网查看https://www.elastic.co/guide/en/logstash/current logstash.yml配置文件#------------Nodeidentity------------#节点名称,默认主机名node.name:test#------------Datapath------------------ 查看详情

logstash:多个配置文件(conf)(代码片段)

Logstash:多个配置文件(conf)对于多个配置的处理方法,有多个处理方法:多个pipeline一个pipleline处理多个配置文件一个pipeline含有一个逻辑的数据流,它从input接收数据,并把它们传入到队列里,经过worker的处理,最后输出到out... 查看详情

logstash:多个配置文件(conf)(代码片段)

在前面的一篇文章“Logstash:处理多个input”中,我们介绍了如何使用在同一个配置文件中处理两个input的情况。在今天这篇文章中,我们来介绍如何来处理多个配置文件的问题。对于多个配置的处理方法,有多个处理方法... 查看详情

logstash命令行配置选项(代码片段)

命令行标记Logstash有以下标记,你可以使用--help标志来显示此信息。--node.nameNAME指定此Logstash实例的名称,如果没有赋值,它将默认为当前主机名。-f,--path.configCONFIG_PATH从特定的文件或目录加载Logstash配置,如果给定一个目录,则... 查看详情

轻松测试logstash的配置文件(代码片段)

...会极大的降低这种风险。本文将介绍一个可以自动化测试logstash配置文件的工具,让大家可以像写单元测试用例一样为logstash配置文件创建测试case,并以快速迭代的方式变更logstash配置文件。噩梦在很多真实环境中,变更logstash的... 查看详情

logstash的各个场景应用(配置文件均已实践过)(代码片段)

场景:1)datasource->logstash->elasticsearch->kibana2)datasource->filebeat->logstash->elasticsearch->kibana3)datasource->filebeat->logstash->redis/kafka->logstash->elas 查看详情

centos7安装logstash(代码片段)

CentOS7安装LogstashLogstash工作原理安装Logstash下载解压修改配置文件验证配置文件设置数据源日志输出启动Logstash输出日志到ElasticsearchLogstash收集Nginx日志验证配置文件配置Nginx日志启动LogstashLogstash工作原理Logstash使用管道方式进行... 查看详情

es坑之logstash配置文件(代码片段)

inputstdinjdbc#mysql数据库链接jdbc_connection_string=>"jdbc:mysql:localhost/database?characterEncoding=utf8"#用户名和密码jdbc_user=>"xxx"jdbc_password=>"xxxx"#驱动jdbc_driver_library=>"D:/xx/xx/log 查看详情

elk做日志分析(filebeat+logstash+elasticsearch)配置(代码片段)

利用Filebeat去读取日志发送到Logstash,再由Logstash处理后发送给Elasticsearch。一、Filebeat项目日志文件:利用Filebeat去读取文件,paths下面配置路径地址,Filebeat会自动去读取/data/share/business_log/TA-*/debug.log文件#===========================Filebea... 查看详情

logstash命令行配置选项(代码片段)

命令行标记Logstash有以下标记,你可以使用--help标志来显示此信息。--node.nameNAME指定此Logstash实例的名称,如果没有赋值,它将默认为当前主机名。-f,--path.configCONFIG_PATH从特定的文件或目录加载Logstash配置,如果给定一个目录,则... 查看详情

logstash对配置文件conf敏感信息,密码等加密(代码片段)

logstash的配置文件conf经常会涉及敏感信息,比如ES,mysql的账户密码等,以下使用logstash导入mysql为例子,加密隐藏mysql的密码。在向keystore中添加key及其secret值之后,你可以在配置敏感设置时使用key代替secret值。引用key的语法与环... 查看详情

logstash同步mysql数据到elasticsearch(代码片段)

目录1MySql数据到Elasticsearch1.1下载logstash1.2解压logstash1.3在logstash目录创建mysql文件夹1.4将mysql驱动文件和数据库查询文件放进mysql中1.5在config目录下创建mysqltoes.conf文件1.6mysqltoes.conf配置1.7启动logstash2配置语法讲解3启动方式4filebeat基... 查看详情

docker安装logstash(代码片段)

使用同版本镜像7.4.11、下载Logstash镜像dockerpulllogstash:7.4.1#查看镜像dockerimages 2、编辑logstash.yml配置文件logstash.yml配置文件放在宿主机/data/elk/logstash目录下,内容如下:path.config:/usr/share/logstash/conf.d/*.confpath.logs:/var/log/logstash ... 查看详情

elk上手2在centos下安装logstash和kibana(代码片段)

...安装ElassticSearch一、官网下载地址二、准备环境三、安装Logstash四、安装Kibana1.下载2.配置3.启动4.访问界面一、官网下载地址https://www.elastic.co/cn/downloads/logstash二、准备环境CentOS8JDK8三、安装Logstashwgethttps://artifacts.elastic.co/downloads/lo... 查看详情

logstash安装和使用(代码片段)

下载地址https://artifacts.elastic.co/downloads/logstash/logstash-5.6.8.zip下载后解压,测试#将键盘内容输出到控制台logstash-e‘inputstdinoutputstdout‘ 从文件中读取配置,然后连接mysql然后输出到es新建mysqletc文件夹 inputjdbcjdbc_dr 查看详情

收集tcp/udp日志(代码片段)

收集TCP/UDP日志通过logstash的tcp/udp插件收集日志,通常用于在向elasticsearch日志补录丢失的部分日志,可以将丢失的日志通过一个TCP端口直接写入到elasticsearch服务器。1.配置Logstash#进入Logstash配置文件目录[root@redis01~]#cd/etc/logstash/con... 查看详情