搭建办公环境elasticsearch日志分析系统(代码片段)

author author     2023-05-05     212

关键词:

搭建办公环境ElasticSearch 日志分析系统

? 计划将公司的防火墙+交换机+服务器(centos7)+ Vmware+Windows server纳入到监控范围,所以开启了ELK监控之旅。

? 本文采用ELK架构栈进行组建,万丈高楼平地起,虽然开始比较简陋,后期会不断完善这个日志分析系统。

? 全文框架如下:

? Hillstone: syslog→logstash→elasticsearch→kibana

? H3C: syslog→logstash→elasticsearch→kibana

? ESXI: syslog→logstash→elasticsearch→kibana

? Vcenter: syslog→logstash→elasticsearch→kibana

? Windows server: winlogbeat→logstash→elasticsearch→kibana

? linux server: filebeate→lasticsearch→kibana

? ELK说明:

? ELK1: 192.168.20.18:9200

? ELK2: 192.168.20.19:9200

? 规划:

? Logstash: 192.168.20.18

? 不同服务根据端口不同进行标记,创建不同的索引。

?

1 Hillstone部分

1.1 Hillstone配置

1.1.1 配置步骤

? 本文通过web界面配置,当然也能进行命令行配置,具体配置请参考链接。

? 找到Stoneos-日志管理-log配置-日志管理器,配置服务器日志:
? 主机名: 192.168.20.18
? 绑定方式: 虚拟路由器 trust-vr
? 协议: UDP
? 端口: 514

? //我使用的root运行,非root账号使用端口在1024以上。

? 技术图片

技术图片

技术图片

1.1.2 参考链接

elk收集数据中心网络设备日志

hillstone常见配置命令

1.2 添加logstash 配置文件

1.2.1 创建配置测试文件

cat  > /data/config/test-hillstone.config << EOF
input
        udp port => 518 type => "Hillstone"

output 
    stdout  codec=> rubydebug 

EOF
logstash -f test-hillstone.config

1.2.2 选取其中一条日志进行分析调试。

<190>Nov 29 17:24:52 1404726150004842(root) 44243624 Traffic@FLOW: SESSION: 10.6.2.43:49608->192.168.20.160:11800(TCP), application TCP-ANY, interface tunnel6, vr trust-vr, policy 1, user -@-, host -, send packets 1,send bytes 74,receive packets 1,receive bytes 110,start time 2019-11-29 17:24:50,close time 2019-11-29 17:24:52,session end,TCP RST
u0000

1.2.3Logstash分析说明

    可以通过grok debug网站进行自动匹配(https://grokdebug.herokuapp.com/discover?#),再根据分析出来的日志,进行二次调整。
    同样晚上有很多案例进行参考,可以先去参考别人想法,再补充自己的想法。
    关于grok部分详细讲解,请参考https://coding.imooc.com/class/181.html,老师讲的很棒,当然吾爱破解论坛和B站,有免费版。

1.2.4 Logstash综合配置

? 只选取了会话+NAT部分

cat  > /data/config/hillstone.config<< EOF
input
        udp 
        port => 518 
        type => "hillstone"
        


filter 
    grok 
##流量日志

#SESSION会话结束日志
        match =>  "message" => "<%BASE10NUM:syslog_pri>%SYSLOGTIMESTAMP:timestamp %BASE10NUM:serial(%WORD:ROOT) %DATA:logid %DATA:Sort@%DATA:Class: %DATA:module: %IPV4:srcip:%BASE10NUM:srcport->%IPV4:dstip:%WORD:dstport(%DATA:protocol), application %USER:app, interface %DATA:interface, vr %USER:vr, policy %DATA:policy, user %USERNAME:user@%DATA:AAAserver, host %USER:HOST, send packets %BASE10NUM:sendPackets,send bytes %BASE10NUM:sendBytes,receive packets %BASE10NUM:receivePackets,receive bytes %BASE10NUM:receiveBytes,start time %TIMESTAMP_ISO8601:startTime,close time %TIMESTAMP_ISO8601:closeTime,session %WORD:state,%GREEDYDATA:reason"
#SESSION会话开始日志
        match =>  "message" => "<%BASE10NUM:syslog_pri>%SYSLOGTIMESTAMP:timestamp %BASE10NUM:serial(%WORD:ROOT) %DATA:logid %DATA:Sort@%DATA:Class: %DATA:module: %IPV4:srcip:%BASE10NUM:srcport->%IPV4:dstip:%WORD:dstport(%DATA:protocol), interface %DATA:interface, vr %DATA:vr, policy %DATA:policy, user %USERNAME:user@%DATA:AAAserver, host %USER:HOST, session %WORD:state%GREEDYDATA:reason"
#SNAT日志
        match =>  "message" => "<%BASE10NUM:syslog_pri>%SYSLOGTIMESTAMP:timestamp %BASE10NUM:serial(%WORD:ROOT) %DATA:logid %DATA:Sort@%DATA:Class: %DATA:module: %IPV4:srcip:%BASE10NUM:srcport->%IPV4:dstip:%WORD:dstport(%DATA:protocol), %WORD:state to %IPV4:snatip:%BASE10NUM:snatport, vr %DATA:vr, user %USERNAME:user@%DATA:AAAserver, host %DATA:HOST, rule %BASE10NUM:rule"
#DNAT日志
                match =>  "message" => "<%BASE10NUM:syslog_pri>%SYSLOGTIMESTAMP:timestamp %BASE10NUM:serial(%WORD:ROOT) %DATA:logid %DATA:Sort@%DATA:Class: %DATA:module: %IPV4:srcip:%BASE10NUM:srcport->%IPV4:dstip:%WORD:dstport(%DATA:protocol), %WORD:state to %IPV4:dnatip:%BASE10NUM:dnatport, vr %DATA:vr, user %USERNAME:user@%DATA:AAAserver, host %DATA:HOST, rule %BASE10NUM:rule"
    
    mutate 
                lowercase => [ "module" ]
                remove_field => ["host", "message", "ROOT", "HOST", "serial", "syslog_pri", "timestamp", "mac", "AAAserver", "user"]
        

output 
    elasticsearch 
    hosts => "192.168.20.18:9200"  #elasticsearch服务地址
    index => "logstash-hillstone-%module-%state-%+YYYY.MM.dd"
    

EOF
3.1.2.3 参考文件

hillstone中logstash配置参考

elk收集数据中心网络设备日志

山石hillstone Logstash配置流程

ELK从入门到实践

2 H3C交换机部分

2.1 H3C Syslog转发

2.1.1 H3C交换机配置

? 本文通过命令行进行配置,具体配置请参考链接。

  • ? 将交换机的时间设置正确

    clock datetime hh:mm:ss year/month/day
    save force
  • 设置交换机syslog转发。

    system-view
    info-center enable // 开启info-center
    info-center loghost 192.168.20.18 port 516 facility local8 // 设置日志主机/端口/日志级别
    info-center source  default  loghost  level   informational  //设置日志级别
    save force
    

2.1.2 参考链接

? H3C设置时间

? H3C网络日志转发

? H3C配置日志主机

2.2 添加logstash 配置文件

2.2.1 创建配置测试文件

cat  > /data/config/test-h3c.congfig << EOF
input
        udp port => 516 type => "h3c"

output 
    stdout  codec=> rubydebug 

EOF

2.2.2 选取一条日志进行分析调试。

<190>Nov 30 16:27:23 1404726150004842(root) 44243622 Traffic@FLOW: SESSION: 10.6.4.178:48150->192.168.20.161:11800(TCP), interface tunnel6, vr trust-vr, policy 1, user -@-, host -, session start
u0000

2.2.3 mutate基本用法说明

? 参考链接: https://blog.csdn.net/qq_34624315/article/details/83013531

2.2.4 综合配置。

cat > H3C.conf <<EOF
###h3c 日志过滤
    grok 
        match =>  "message" => "<%BASE10NUM:syslog_pri>%SYSLOGTIMESTAMP:timestamp %DATA:year %DATA:hostname \%\%%DATA:ddModuleName/%POSINT:severity/%DATA:brief: %GREEDYDATA:reason"        
        add_field => "severity_code" => "%severity"          
        
    mutate     
        gsub => [
            "severity", "0", "Emergency",
            "severity", "1", "Alert",
            "severity", "2", "Critical",
            "severity", "3", "Error",
            "severity", "4", "Warning",
            "severity", "5", "Notice",
            "severity", "6", "Informational",
            "severity", "7", "Debug"
        ]
        remove_field => ["message", "syslog_pri"]
        

output 
    stdout  codec=> rubydebug 
#    elasticsearch 
#    hosts => "192.168.20.18:9200"  #elasticsearch服务地址
#    index => "logstash-h3c-%+YYYY.MM.dd"
#    

EOF

2.2.5 参考文件

交换路由等网络设备logstash配置

logstash配置文件

3 Vmware部分

3.1 Esxi日志收集部分

? 主要是收集ESXI机器日志,方便进行安全日志分析;

? 主要通过syslog进行日志收集,再通过ELK栈提供的logstash进行分析

? ESXI-syslog--logstash--elasticsearch

3.1.1 Esxi机器syslog服务器配置

3.1.1.1 配置步骤

? 本文通过客户端配置,当然也能进行web配置,方法基本一致,具体配置请参考链接。

  • ? 开启syslog服务:

    打开esxi客户端-选择主机-主机配置-高级设置-syslog-设置远程syslog服务器为: udp://192.168.20.18:514

    技术图片

  • 允许防火墙放行。

    打开esxi客户端-选择主机-主机配置-安全配置文件-防火墙-编辑-勾选syslog服务器,点击确定。

    技术图片

    技术图片

    3.1.1.2 参考链接

    Vmware Esxi syslog配置

    在Esxi上配置syslog

    Monitoring VMWare ESXi with the ELK Stack

    3.1.2 添加logstash 配置文件

    3.1.2.1 创建配置测试文件
    cat  > /data/config/test-vmware.config << EOF
    input
          udp port => 514 type => "Hillstone"
    
    output 
      stdout  codec=> rubydebug 
    
    EOF
    logstash -f test-vmware.config
    3.1.2.2 选取其中一条日志进行分析调试。
    <167>2019-12-03T07:36:11.689Z localhost.localdomain Vpxa: verbose vpxa[644C8B70] [Originator@6876 sub=VpxaHalCnxHostagent opID=WFU-2d14bc3d] [WaitForUpdatesDone] Completed callback
    
    3.1.2.3 结合网上的综合案例对测试文件进行配置改造。
    cat > vmware.conf <<EOF
    input
          udp 
          port => 514
          type => "vmware"
    
    
    
    filter 
    if "vmware" in [type] 
        grok 
            break_on_match => true
            match => [
                "message", "<%POSINT:syslog_pri>%TIMESTAMP_ISO8601:syslog_timestamp %SYSLOGHOST:syslog_hostname %SYSLOGPROG:syslog_program: (?<message-body>(?<message_system_info>(?:[%DATA:message_thread_id %DATA:syslog_level ‘%DATA:message_service‘ ?%DATA:message_opID])) [%DATA:message_service_info] (?<syslog_message>(%GREEDYDATA)))",
                "message", "<%POSINT:syslog_pri>%TIMESTAMP_ISO8601:syslog_timestamp %SYSLOGHOST:syslog_hostname %SYSLOGPROG:syslog_program: (?<message-body>(?<message_system_info>(?:[%DATA:message_thread_id %DATA:syslog_level ‘%DATA:message_service‘ ?%DATA:message_opID])) (?<syslog_message>(%GREEDYDATA)))",
                "message", "<%POSINT:syslog_pri>%TIMESTAMP_ISO8601:syslog_timestamp %SYSLOGHOST:syslog_hostname %SYSLOGPROG:syslog_program: %GREEDYDATA:syslog_message"
            ]
        
         date 
                          match => [ "syslog_timestamp", "YYYY-MM-ddHH:mm:ss", "ISO8601" ]
                  
        mutate 
            replace => [ "@source_host", "%syslog_hostname" ]
        
        mutate 
            replace => [ "@message", "%syslog_message" ]
        
        mutate 
            remove_field => ["@source_host","program","@timestamp","syslog_hostname","@message"]
        
        if "Device naa" in [message] 
            grok 
                break_on_match => false
                match => [
                    "message", "Device naa.%WORD:device_naa performance has %WORD:device_status%GREEDYDATA of %INT:datastore_latency_from%GREEDYDATA to %INT:datastore_latency_to",
                    "message", "Device naa.%WORD:device_naa performance has %WORD:device_status%GREEDYDATA from %INT:datastore_latency_from%GREEDYDATA to %INT:datastore_latency_to"
                ]
            
        
        if "connectivity issues" in [message] 
            grok 
                match => [
                    "message", "Hostd: %GREEDYDATA : %DATA:device_access to volume %DATA:device_id %DATA:datastore (following|due to)"
                ]
            
        
        if "WARNING" in [message] 
            grok 
                match => [
                    "message", "WARNING: %GREEDYDATA:vmware_warning_msg"
                ]
            
        
    
    
    output 
      elasticsearch 
      hosts => "192.168.20.18:9200"  #elasticsearch服务地址
      index => "logstash-vmware-%+YYYY.MM.dd"
      
    #    stdout  codec=> rubydebug 
    
    EOF
    
    3.1.2.3 参考文件

    mutate基本用法

    基本logstash配置文件参考

    vmware and syslog

    logstash VCSA6.0

    filter plugins

    3.2 Vcenter日志收集部分

    3.2.1 Vcenter-syslog服务器配置

    ? 主要是收集vcsa机器日志,方便进行安全日志分析;

    ? 主要通过syslog进行日志收集,再通过ELK栈提供的logstash进行分析

    ? VCSA-Syslog--Logstash--Elasticsearch

    3.2.1.1 配置步骤

    ? 打开VCSA的管理后台URL: http://192.168.20.90:5480,输入账号和密码(开机root和密码)--点击syslog配置中心,输入syslog配置信息

    技术图片

    技术图片

    技术图片

    3.2.1.2 参考链接

    Vmware Esxi syslog配置

    VCSA 6.5 forward to multiple syslog

    VCSA syslog

    3.2.2 添加logstash 配置文件

    3.2.2.1 创建测试配置
    input
          udp 
          port => 1514
          type => "vcenter"
    
    
    
    output 
      stdout  codec=> rubydebug 
    
    
    3.2.2.2 选取一条日志进行分析调试
    <14>1 2019-12-05T02:44:17.640474+00:00 photon-machine vpxd 4035 - -  Event [4184629] [1-1] [2019-12-05T02:44:00.017928Z] [vim.event.UserLoginSessionEvent] [info] [root] [Datacenter] [4184629] [User root@192.168.20.17 logged in as pyvmomi Python/3.6.8 (Linux; 3.10.0-957.el7.x86_64; x86_64)]
    
    3.2.2.3 结合网上的综合案例对配置文件进行配置改造。
    cat > vcenter.conf <<EOF
    input
          udp 
          port => 1514
          type => "vcenter"
    
    
    filter 
    if "vcenter" in [type] 
      
      grok 
        break_on_match => true
        match => [
          "message", "<%NONNEGINT:syslog_pri>%NONNEGINT:syslog_ver +(?:%TIMESTAMP_ISO8601:syslog_timestamp|-) +(?:%HOSTNAME:syslog_hostname|-) +(-|%SYSLOG5424PRINTASCII:syslog_program) +(-|%SYSLOG5424PRINTASCII:syslog_proc) +(-|%SYSLOG5424PRINTASCII:syslog_msgid) +(?:%SYSLOG5424SD:syslog_sd|-|) +%GREEDYDATA:syslog_msg"
        ]
      
      date 
           match => [ "syslog_timestamp", "YYYY-MM-ddHH:mm:ss,SSS", "YYYY-MM-dd HH:mm:ss,SSS", "ISO8601" ]
           #timezone => "UTC" #For vCenter Appliance
           #timezone => "Asia/Shanghai"
      
    
     mutate 
           remove_field => ["syslog_ver", "syslog_pri"]
                          
    
    
    output 
      elasticsearch 
      hosts => "192.168.20.18:9200"  #elasticsearch服务地址
      index => "logstash-vcenter-%+YYYY.MM.dd"
      
    #    stdout  codec=> rubydebug 
    
    
    EOF
    
    3.2.2.4 参考文件

    mutate基本用法

    基本logstash配置文件参考

    vmware and syslog

    logstash VCSA6.0

    4 Windows server部分

    4.1 windows server日志收集部分

    ? 主要是收集AD域日志,方便进行安全日志分析;

    ? 主要通过ELK栈提供的winlogbeat进行收集

    ? winlogbeat--logstash--elasticsearch

    4.1.1 windows服务器配置

    • 下载安装

    下载连接地址:https://www.elastic.co/cn/downloads/beats/winlogbeat

    技术图片

    • 配置:

    将解压后的文件放到“C:Program Files”,重命名为winlogbeat

    技术图片

    • 安装 :

    命令安装

    技术图片

    • 编辑:

    编辑winlogbeat.yml文件

    winlogbeat.event_logs:
      - name: Application
      - name: Security
      - name: System
    
    output.logstash:
      enbaled: true
      hosts: ["192.168.20.18:5044"]
    
    logging.to_files: true
    logging.files:
      path: D:ProgramDatawinlogbeatLogs
    logging.level: info
    • 测试配置文件
    PS C:Program FilesWinlogbeat> .winlogbeat.exe test config -c .winlogbeat.yml -e
    • 启动:

    启动winlogbeat

    powershell命令行启动:

    PS C:Program FilesWinlogbeat> Start-Service winlogbeat

    powershell命令行关闭

    PS C:Program FilesWinlogbeat> Stop-Service winlogbeat
    • 导入模板

    导入winlogindex模板,因为我们使用的logstash,所以需要手动导入。

    PS > .winlogbeat.exe setup --index-management -E output.logstash.enabled=false -E ‘output.elasticsearch.hosts=["192.168.20.18"]‘
    • 导入dashboard

    导入kibana-dashboard,因为我们使用的logstash,所以需要手动导入。

    PS > .winlogbeat.exe setup -e  -E output.logstash.enabled=false   -E output.elasticsearch.hosts=[‘192.168.20.18:9200‘]   -E setup.kibana.host=192.168.20.18:5601

    4.1.2 参考链接

    ? windows 上winlogbeat安装

    ? 官方手册分析

    4.2 添加logstash 配置文件

    4.2.1 创建测试配置文件

    cat  > /data/config/test-windows.config << EOF
    input 
    beats 
      port => 5044
    
    
    
    output 
    stdout  codec=> rubydebug 
    
    EOF
    logstash -f test-windows.config

    4.2.2 创建配置文件

    创建正式配置文件,查看内容(因为已有模板,所以不错其他修改处理)

    cat  > /data/config/windows.config << EOF
    input 
    beats 
      port => 5044
    
    
    
    output 
    elasticsearch 
      hosts => ["http:192.168.20.18:9200"]
      index => "%[@metadata][beat]-%[@metadata][version]" 
    
    
    EOF
    logstash -f windows.config
    

    4.2.3 参考文件

    beats input plugin

    4.3查看Discover

    ? 因为已经winlogbeat是elk栈的标准模块,已经被定义,所以我们不再自行定义。

    ? 直接打开搜索winlogbaet*的index。

    技术图片

    4.4 查看dashboard

    ? 因为已经winlogbeat是elk栈的标准模块,已经被定义,所以我们不再自行定义。

    ? 直接打开搜索winlogbaet*的dashboard

    技术图片

    技术图片

    5 Linux server部分

    5.1 linux 系统日志收集部分

    ? 主要是收集linux上的开关机日志,安全日志;

    ? 主要通过ELK栈提供的filebeat进行收集

    ? filebeat-filebeat-module--elasticsearch

    ? 技术图片

    5.1.1 安装部署

    • 下载

    官网下载,连接地址:https://www.elastic.co/cn/downloads/beats/filebeat

    技术图片

    • 安装 :

    命令行安装

    curl -L -O https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.5.0-linux-x86_64.tar.gz
    tar xzvf filebeat-7.5.0-linux-x86_64.tar.gz
    • 查看布局

    查看filebeat目录布局

    技术图片

    • 编辑:

    编辑filebeat.yml文件

    filebeat.inputs:
    - type: log
      enabled: true
      paths:
        - /var/log/*.log
        #- c:programdataelasticsearchlogs*
    output.elasticsearch:
      hosts: ["192.168.20.18:9200"]
    setup.kibana:
      host: "192.168.20.18:5601"
    
    • 启动:

    运行filebeat文件

    filebeat  -c filebeat.yml -e

    5.1.2 参考链接

    windows 上winlogbeat安装

    官方手册分析

    5.2 修改filebeat配置文件

    5.2.1索引名称

    关闭ilm声明周期管理

    setup.ilm.enabled: false

    更改索引名称

    setup.template.overwrite: true
    output.elasticsearch.index: "systemlog-7.3.0-%+yyyy.MM.dd"
    setup.template.name: "systemlog"
    setup.template.pattern: "systemlog-*"

    修改预先构建的kibana仪表盘

    setup.dashboards.index: "systemlog-*"

    5.2.1 开启system模块

    ./filebeat modules enable system
    ./filebeat modules list

    5.2.3 重新初始化环境

    ./filebeat setup --template -e -c  filebeat.yml

    5.2.4 配置模块,修改配置文件

    filebeat.modules:
    - module: system
    syslog:
      enabled: true
    #默认位置/var/log/messages* /var/log/syslog*
    auth:
      enabled: true
    #默认位置/var/log/auth.log*  /var/log/secure*
    output.elasticsearch:
    hosts: ["192.168.20.18:9200"]
    setup.kibana:
    host: "192.168.20.18:5601"

    5.2.5 重新启动filebeats,初始化模块

    ./filebeat setup -e -c  filebeat.yml

    5.2.6 参考文件

    beats input plugin

    filebeat模块与配置

    system module

    5.3 查看discover

    技术图片

    5.4 查看dashboard

    技术图片

    6 整合logstash配置

    6.1 整合logstash配置文件

    input
    udp 
      port => 516
      type => "h3c"
    
    
    input
    udp 
      port => 518
      type => "hillstone"
    
    
    input
    udp 
      port => 514
      type => "vmware"
    
    
    input
    udp 
      port => 1514
      type => "vcenter"
    
    
    input 
    beats 
      port => 5044
      type => "windows"
    
    
    filter 
    if [type] == "hillstone" 
    grok 
      match =>  "message" => "<%BASE10NUM:syslog_pri>%SYSLOGTIMESTAMP:timestamp %BASE10NUM:serial(%WORD:ROOT) %DATA:logid %DATA:Sort@%DATA:Class: %DATA:module: %IPV4:srcip:%BASE10NUM:srcport->%IPV4:dstip:%WORD:dstport(%DATA:protocol), application %USER:app, interface %DATA:interface, vr %USER:vr, policy %DATA:policy, user %USERNAME:user@%DATA:AAAserver, host %USER:HOST, send packets %BASE10NUM:sendPackets,send bytes %BASE10NUM:sendBytes,receive packets %BASE10NUM:receivePackets,receive bytes %BASE10NUM:receiveBytes,start time %TIMESTAMP_ISO8601:startTime,close time %TIMESTAMP_ISO8601:closeTime,session %WORD:state,%GREEDYDATA:reason"
      match =>  "message" => "<%BASE10NUM:syslog_pri>%SYSLOGTIMESTAMP:timestamp %BASE10NUM:serial(%WORD:ROOT) %DATA:logid %DATA:Sort@%DATA:Class: %DATA:module: %IPV4:srcip:%BASE10NUM:srcport->%IPV4:dstip:%WORD:dstport(%DATA:protocol), interface %DATA:interface, vr %DATA:vr, policy %DATA:policy, user %USERNAME:user@%DATA:AAAserver, host %USER:HOST, session %WORD:state%GREEDYDATA:reason"
      match =>  "message" => "<%BASE10NUM:syslog_pri>%SYSLOGTIMESTAMP:timestamp %BASE10NUM:serial(%WORD:ROOT) %DATA:logid %DATA:Sort@%DATA:Class: %DATA:module: %IPV4:srcip:%BASE10NUM:srcport->%IPV4:dstip:%WORD:dstport(%DATA:protocol), %WORD:state to %IPV4:snatip:%BASE10NUM:snatport, vr %DATA:vr, user %USERNAME:user@%DATA:AAAserver, host %DATA:HOST, rule %BASE10NUM:rule"                                                                                                                                                         match =>  "message" => "<%BASE10NUM:syslog_pri>%SYSLOGTIMESTAMP:timestamp %BASE10NUM:serial(%WORD:ROOT) %DATA:logid %DATA:Sort@%DATA:Class: %DATA:module: %IPV4:srcip:%BASE10NUM:srcport->%IPV4:dstip:%WORD:dstport(%DATA:protocol), %WORD:state to %IPV4:dnatip:%BASE10NUM:dnatport, vr %DATA:vr, user %USERNAME:user@%DATA:AAAserver, host %DATA:HOST, rule %BASE10NUM:rule"
    
    mutate 
    lowercase => [ "module" ]
    remove_field => ["host", "message", "ROOT", "HOST", "serial", "syslog_pri", "timestamp", "mac", "AAAserver", "user"]
    
    
    if [type] == "h3c" 
      grok 
        match =>  "message" => "<%BASE10NUM:syslog_pri>%SYSLOGTIMESTAMP:timestamp %DATA:year %DATA:hostname \%\%%DATA:ddModuleName/%POSINT:severity/%DATA:brief: %GREEDYDATA:reason" 
      add_field => "severity_code" => "%severity"
    
    mutate 
      gsub => [
    "severity", "0", "Emergency",
    "severity", "1", "Alert",
    "severity", "2", "Critical",
    "severity", "3", "Error",
    "severity", "4", "Warning",
    "severity", "5", "Notice",
    "severity", "6", "Informational",
    "severity", "7", "Debug"
    ]
      remove_field => ["message", "syslog_pri"]
    
    
    if [type] == "vmware" 
    grok 
    break_on_match => true
    match => [
    "message", "<%POSINT:syslog_pri>%TIMESTAMP_ISO8601:syslog_timestamp %SYSLOGHOST:syslog_hostname %SYSLOGPROG:syslog_program: (?<message-body>(?<message_system_info>(?:[%DATA:message_thread_id %DATA:syslog_level ‘%DATA:message_service‘ ?%DATA:message_opID])) [%DATA:message_service_info] (?<syslog_message>(%GREEDYDATA)))",
    "message", "<%POSINT:syslog_pri>%TIMESTAMP_ISO8601:syslog_timestamp %SYSLOGHOST:syslog_hostname %SYSLOGPROG:syslog_program: (?<message-body>(?<message_system_info>(?:[%DATA:message_thread_id %DATA:syslog_level ‘%DATA:message_service‘ ?%DATA:message_opID])) (?<syslog_message>(%GREEDYDATA)))",
    "message", "<%POSINT:syslog_pri>%TIMESTAMP_ISO8601:syslog_timestamp %SYSLOGHOST:syslog_hostname %SYSLOGPROG:syslog_program: %GREEDYDATA:syslog_message"
    ]
    
    date 
    match => [ "syslog_timestamp", "YYYY-MM-ddHH:mm:ss", "ISO8601" ]
    
    mutate 
    replace => [ "@source_host", "%syslog_hostname" ]
    
    mutate 
    replace => [ "@message", "%syslog_message" ]
    
    mutate 
    remove_field => ["@source_host","program","syslog_hostname","@message"]
    
    
    if [type] == "vcenter" 
    grok 
    break_on_match => true
    match => [
    "message", "<%NONNEGINT:syslog_pri>%NONNEGINT:syslog_ver +(?:%TIMESTAMP_ISO8601:syslog_timestamp|-) +(?:%HOSTNAME:syslog_hostname|-) +(-|%SYSLOG5424PRINTASCII:syslog_program) +(-|%SYSLOG5424PRINTASCII:syslog_proc) +(-|%SYSLOG5424PRINTASCII:syslog_msgid) +(?:%SYSLOG5424SD:syslog_sd|-|) +%GREEDYDATA:syslog_msg"
    ]
    
    date 
    match => [ "syslog_timestamp", "YYYY-MM-ddHH:mm:ss,SSS", "YYYY-MM-dd HH:mm:ss,SSS", "ISO8601" ]
    
    mutate 
    remove_field => ["syslog_ver", "syslog_pri"]
    
    
    
    output 
    if [type] == "hillstone" 
    elasticsearch 
      hosts => "192.168.20.18:9200"
      index => "hillstone-%module-%+YYYY.MM.dd"
    
    
    if [type] == "h3c" 
    elasticsearch 
      hosts => "192.168.20.18:9200"
      index => "h3c-%+YYYY.MM.dd"
    
    
    if [type] == "vmware" 
    elasticsearch 
      hosts => "192.168.20.18:9200"
      index => "vmware-%+YYYY.MM.dd"
    
    
    if [type] == "vcenter" 
    elasticsearch 
      hosts => "192.168.20.18:9200"
      index => "vcenter-%+YYYY.MM.dd"
    
    
    if [type] == "windows" 
    elasticsearch 
      hosts => "192.168.20.18:9200"
      index => "%[@metadata][beat]-%[@metadata][version]"
    
    
    
    

    6.2 配置logstash服务

    启动logstash服务,并把mix.config 更改为logstash.conf ,放到/etc/logstash 目录下。

    systemctl enable logstash
    systemc   start logstash
    未完待续

    推荐查看英文文档方式

    找到一篇英文站点,将鼠标移动到url开始,添加icopy.site/回车。

    ? "icopy.site/"+"https://www.elastic.co"

    例如 : 源网址:https://www.elastic.co/guide/en/logstash/current/plugins-filters-grok.html

    ? 转译后网址:https://s0www0elastic0co.icopy.site/guide/en/logstash/current/plugins-filters-grok.html

    推荐理由:

    ? 1.比谷歌全文翻译更准确,而且关键代码不翻译。

    ? 2.如果你英文不好,或者看英文文档太累,可以试下哦。

? 推荐学习视频:

? ELK入门到实践

elk日志分析系统(代码片段)

...日志进行集中格式化;将日志格式化(logstash)并输出到Elasticsearch;对格式化后的数据进行索引和存储(Elasticsearch);前端数据的展示(Kibana)E:Elasticsearch,提供了一个分布式多用户能力的全文搜索引擎L:Logstash,一款强大的... 查看详情

elk日志分析系统搭建配置

...慢查询日志,tomcat运行日志以及系统日志等。介绍:ELK:ElasticSearch+LogStash+Kibana=ElkStackElasticSearch:存储、收索、分析(可以用solr替代)LogStash:收集器,输入,处理分析,存储到ESKibana:展示备注:ElasticSearch支持集群功能,日志... 查看详情

快速搭建elk日志分析系统(代码片段)

...tps://www.elastic.co/cn/官网权威指南:https://www.elastic.co/guide/cn/elasticsearch/guide/current/index.html安装指南:https://www.elastic.co/guide/en/elasticsearch/reference/5.x/rpm.htmlELK是Elasticsearch、Logstash、Kibana的简称,这三者是核心套件,但并非全部。Elastics... 查看详情

linuxelk日志分析系统|logstash日志收集|elasticsearch搜索引擎|kibana可视化平台|架构搭建|超详细(代码片段)

LinuxELK日志分析系统|logstash日志收集|elasticsearch搜索引擎|kibana可视化平台|架构搭建|超详细ELK日志分析系统1.日志服务器2.ELK日志分析系统3日志处理步骤一、Elasticsearch介绍1.1概述1.2核心概念二、Kibana介绍三ELK架构搭建3.1配置要求3.... 查看详情

十分钟搭建和使用elk日志分析系统

...试环境日志的目的,准备采用EK+filebeat实现日志可视化(ElasticSearch+Kibana+Filebeat)。题目为“十分钟搭建和使用ELK日志分析系统”听起来有点唬人,其实如果单纯满足可视化要求,并且各软件都已经下载到本地,十分钟是可以搭... 查看详情

elk(elasticsearch,logstash,kibana)搭建实时日志分析平台

ELK(ElasticSearch,Logstash,Kibana)搭建实时日志分析平台 日志主要包括系统日志、应用程序日志和安全日志。系统运维和开发人员可以通过日志了解服务器软硬件信息、检查配置过程中的错误及错误发生的原因。经常分析日志可以... 查看详情

elk日志分析系统(代码片段)

...日志进行集中格式化;将日志格式化(logstash)并输出到Elasticsearch;对格式化后的数据进行索引和存储(Elasticsearch);前端数据的展示(Kibana)二、搭建ELK日志分析系统:第一步:先配置elasticsearch环境(1)修改两台主机名,分... 查看详情

震惊全网的elk日志分析系统(齐全详细理论+搭建步骤图释)(代码片段)

...、前言概述与基础理论1.1ELK系统简介1.2ELK日志工作原理1.3Elasticsearch、Logstash、Kibana详细介绍1.3.1Elasticsearch1.3.2Logstash1.3.3Kibana二、ELK日志分析系统搭建2.1实验环境2.2部署Elasticsearch软件(node1和node2都需部署)2.3安装elasticsearc... 查看详情

elk(elasticsearch+logstash+kibana)开源日志分析平台搭建

环境介绍System: CentOS7.2x86_64hostname: elk-server.huangming.orgIPAddress:10.0.6.42、10.17.83.42本篇的ELK环境为单机部署方式,即将ELK所有的软件包都安装在一台服务器上,配置如下:CPU: 4cMem: 8GDisk: 50一、Elasticsearch安装1 查看详情

centos下使用elk套件搭建日志分析和监控平台

1概述ELK套件(ELKstack)是指ElasticSearch、Logstash和Kibana三件套。这三个软件可以组成一套日志分析和监控工具。由于三个软件各自的版本号太多,建议采用ElasticSearch官网推荐的搭配组合:http://www.elasticsearch.org/overview/elkdownloads/2环... 查看详情

elk日志分析系统搭建

注:/usr/local/src为源码安装包存放目录。/data/为数据存储、解压目录。准备工作:下载elasticsearch、filebeat、Kibana_Hanization、elasticsearch-head、jdk、kibana、logstash、nodewgethttps://download.oracle.com/otn-pub/java/jdk/8u191-b12/2787e4 查看详情

十分钟搭建和使用elk日志分析系统(代码片段)

...试环境日志的目的,准备采用EK+filebeat实现日志可视化(ElasticSearch+Kibana+Filebeat)。题目为“十分钟搭建和使用ELK日志分析系统”听起来有点唬人,其实如果单纯满足可视化要求,并且各软件都已经下载到本地,十分钟是可以搭... 查看详情

elk+kafka构建日志收集系统之环境安装(代码片段)

ELK+kafka构建日志收集系统之环境安装1.背景ELK由Elasticsearch、Logstash和Kibana三部分组件组成;Elasticsearch是个开源分布式搜索引擎,它的特点有:分布式,零配置,自动发现,索引自动分片,索引副本机... 查看详情

elk企业级日志分析系统(代码片段)

...搭建部署百度云盘部署搭建所需ELK压缩包可自提:ELKElasticsearch集群部署(在Node1、Node2节点上操作)1.环境准备2.部署Elasticsearch软件(1)安装elasticsearch—rpm包(2)加载系统服务(3)修改ela... 查看详情

windows10下elk环境快速搭建实践

...,而开源实时日志分析ELK平台完美的解决上述矛盾。ELK由ElasticSearch(ES)、Logstash和Kiabana三个开源工具组成。ES是个开源分布式搜索引擎,它的特点有:分布式,零配置,自动发现,索引自动分片,索引副本机制,restful风格接口... 查看详情

elk-5.4.1搭建日志管理系统elasticsearch安装

Elasticsearch 安装 安装环境操作系统:CentOS6.6IP地址:192.168.5.81 软件包:系统自带yum源关闭防火墙关闭SELinux安装前准备elasticsearch基于java环境运行,使用前需要在服务器中安装jdk。elasticsearch5.4需要jdk8以上版本。(本文... 查看详情

elk(elasticsearch,logstash,kibana)搭建实时日志分析平台

ELK平台介绍在搜索ELK资料的时候,发现这篇文章比较好,于是摘抄一小段:以下内容来自:http://baidu.blog.51cto.com/71938/1676798日志主要包括系统日志、应用程序日志和安全日志。系统运维和开发人员可以通过日志了解服务器软硬件... 查看详情

2018年elasticsearch6.2.2教程elk搭建日志采集分析系统(目录)

...知识点说明,ES搜索接口演示,部署的ELK项目演示章节二elasticSearch6.2版本基础讲解到阿里云部署实战2、搜索引擎知识介绍和相关框架简介:介绍搜索的基本概念,市面上主流的搜索框架elasticSearch和solr等对比什么是搜索:在海量... 查看详情