logstash:jdbcstaticfilterplugin介绍(代码片段)

中国社区官方博客 中国社区官方博客     2023-01-09     287

关键词:

在 Logstash 的过滤器中。有许多的过滤器是可以用来和外部的数据对 pipeline 里的数据进行丰富的。Jdbc_static 过滤器就是其中的一个。此过滤器使用从远程数据库预加载的数据丰富事件。我们之所以选择这个,是因为你不必频繁查询你的数据库,也不会给你的数据库带来很大的压力。

此过滤器最适合使用静态或不经常更改的参考数据(例如环境、用户和产品)来丰富事件。

此过滤器的工作方式是从远程数据库获取数据,将其缓存在本地内存中的 Apache Derby 数据库中,并使用查找来使用本地数据库中缓存的数据来丰富事件。 你可以将过滤器设置为一次加载远程数据(对于静态数据),或者你可以安排远程加载定期运行(对于需要刷新的数据)。

在本篇文章中,我们将详细地介绍如何使用这个过滤器来丰富数据。首先,我们可以参考我之前的另外一篇文章 “Logstash:运用 jdbc_streaming 来丰富我们的数据” 来设置如何访问 MySQL 数据以及在 Jdbc stack 过滤器中如何配置访问 MySQL。

安装 MySQL

如果你还没有安装及配置好你的 MySQL,请参照我之前的文章 “Logstash:把 MySQL 数据导入到 Elasticsearch 中” 来进行安装及配置好自己的 MySQL 数据库。记得把相应的 Connector 放入相应的文件目录中。

安装好我们的 MySQL 后,我们创建一个叫做 data 的数据库,并创建一个叫做 sensors 及一个叫做 users 的表格:

上面的两个表格非常简单。我们可以使用 MySQL 命令查看它们的内容:

mysql> describe sensors;
+-----------------+---------------+------+-----+---------+-------+
| Field           | Type          | Null | Key | Default | Extra |
+-----------------+---------------+------+-----+---------+-------+
| customer        | varchar(255)  | YES  |     | NULL    |       |
| room            | varchar(255)  | YES  |     | NULL    |       |
| floor           | varchar(255)  | YES  |     | NULL    |       |
| department      | varchar(255)  | YES  |     | NULL    |       |
| locationOnFloor | varchar(255)  | YES  |     | NULL    |       |
| sensorType      | varchar(255)  | YES  |     | NULL    |       |
| buildingName    | varchar(255)  | YES  |     | NULL    |       |
| longitude       | double(255,0) | YES  |     | NULL    |       |
| latitude        | double(255,0) | YES  |     | NULL    |       |
| id              | int(255)      | NO   | PRI | NULL    |       |
+-----------------+---------------+------+-----+---------+-------+
10 rows in set (0.00 sec)

mysql> select * from sensors;
+----------+------+---------+-------------+-----------------+-------------+--------------+-----------+----------+----+
| customer | room | floor   | department  | locationOnFloor | sensorType  | buildingName | longitude | latitude | id |
+----------+------+---------+-------------+-----------------+-------------+--------------+-----------+----------+----+
| Elastic  | 101  | Floor 1 | Engineering | Desk 102        | Temperature | 222 Broadway |       -74 |       41 |  1 |
+----------+------+---------+-------------+-----------------+-------------+--------------+-----------+----------+----+
1 row in set (0.00 sec)
mysql> describe users;
+-----------+--------------+------+-----+---------+----------------+
| Field     | Type         | Null | Key | Default | Extra          |
+-----------+--------------+------+-----+---------+----------------+
| userid    | int(11)      | NO   | PRI | NULL    | auto_increment |
| firstname | varchar(255) | YES  |     | NULL    |                |
| lastname  | varchar(255) | YES  |     | NULL    |                |
+-----------+--------------+------+-----+---------+----------------+
3 rows in set (0.00 sec)

mysql> select * from users;
+--------+-----------+----------+
| userid | firstname | lastname |
+--------+-----------+----------+
|      2 | xiaoguo   | liu      |
+--------+-----------+----------+
1 row in set (0.00 sec)

这样,我们就完成了 MySQL 的部署。

使用 Jdbc 过滤器来丰富数据

接下来,我们想使用 sensors 表格中的 id 以及 users 表格中的 userid 来对我们的事件进行丰富。我们来创建如下的一个 Logstash 的配置文件:

logstash.conf

input 
  generator 
    message => "id=1&userid=2"
    count => 1
  


filter 
	kv 
		source => "message"
		field_split => "&?"
    

    mutate 
         convert => 
             "id" => "integer"
             "userid" => "integer"
         
    

   jdbc_static 
    loaders => [ 
      
        id => "remote-sensors"
        query => "select id, customer, room, sensorType from sensors"
        local_table => "sensors"
      ,
      
        id => "remote-users"
        query => "select firstname, lastname, userid from users order by userid"
        local_table => "users"
            
    ]
    local_db_objects => [ 
      
        name => "sensors"
        index_columns => ["id"]
        columns => [
          ["id", "int"],
          ["customer", "varchar(255)"],
          ["room", "varchar(255)"],
          ["sensorType", "varchar(255)"]
        ]
      ,
      
        name => "users"
        index_columns => ["userid"]
        columns => [
          ["firstname", "varchar(255)"],
          ["lastname", "varchar(255)"],
          ["userid", "int"]
        ]
      
    ]
    local_lookups => [ 
      
        id => "local-servers"
        query => "select customer, room, sensorType from sensors WHERE id = :id"
        parameters => id => "[id]"
        target => "server"
      ,
      
        id => "local-users"
        query => "select firstname, lastname from users WHERE userid = :userid"
        parameters => userid => "[userid]"
        target => "user" 
      
    ]
    
    # using add_field here to add & rename values to the event root
    add_field =>  sensor_name => "%[server][0][customer] %[server][0][room] %[server][0][sensortype]" 
    add_field =>  customer_name => "%[user][0][firstname] %[user][0][lastname]"    
    # remove_field => ["server", "user"]
    jdbc_user => "root"
    jdbc_password => "1234"
    jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
    # jdbc_driver_library => ""
    jdbc_connection_string => "jdbc:mysql://localhost:3306/data?useTimezone=true&&serverTimezone=UTC"
     


output 
    stdout 
        codec => rubydebug
    

在上面,我们使用 generator 来创建一个事件。我们在 filter 的部分使用 kv 过滤器来对它进行解析。由于 id 和 userid 为字符串。它们和数据库表格中的字段属性是不一样的。我们使用 mutate 过滤器来对它们的类型进行转换。接下来,我们使用 jdbc_static 过滤器来对这个事件进行丰富。

我们通过如下的命令来启动 Logstash:

./bin/logstash -f logstash.conf 

我们可以在 terminal 中看到如下的结果:

从上面,我们可以看出来有两个新增加的字段 sensor_name 以及 customer_name。这两个字段是从 MySQL 中的表格中进行丰富而来。更多关于 Jdbc static 过滤器的介绍请参阅官方文档。 

logstash服务启动脚本

logstash服务启动脚本最近在弄ELK,发现logstash没有sysv类型的服务启动脚本,于是按照网上一个老外提供的模板自己进行修改#添加用户useraddlogstash-M-s/sbin/nologinmkdir/var/log/logstash/chown-Rlogstash:logstash/var/log/logstash/chown-Rlogstash:logstash/usr/ 查看详情

logstash|logstash&&logstash-input-jdbc安装

Windows系统:     1、安装Logstash       1.1进入官网下载zip包              [1] https://artifacts.el 查看详情

logstash grok 模式来监控 logstash 本身

】logstashgrok模式来监控logstash本身【英文标题】:logstashgrokpatterntomonitorlogstashitself【发布时间】:2016-05-0316:07:48【问题描述】:我想将logstash.log日志添加到我的ELK堆栈中,但我总是遇到grokparsefailure。我的模式在http://grokconstructor.a... 查看详情

logstash学习小记

logstash学习小记标签(空格分隔):日志收集IntroduceLogstashisatoolformanagingeventsandlogs.Youcanuseittocollectlogs,parsethem,andstorethemforlateruse(like,forsearching).–http://logstash.net自从2013年logstash被ES公司收购之后,ELK 查看详情

docker安装logstash(代码片段)

使用同版本镜像7.4.11、下载Logstash镜像dockerpulllogstash:7.4.1#查看镜像dockerimages 2、编辑logstash.yml配置文件logstash.yml配置文件放在宿主机/data/elk/logstash目录下,内容如下:path.config:/usr/share/logstash/conf.d/*.confpath.logs:/var/log/logstash ... 查看详情

logstash服务配置

配置文件:/usr/lib/systemd/system/logstash.service相关目录:logstash.conf位于/etc/logstash/conf.d中,logstash.yml位于/etc/logstash中[Unit]Description=LogstashDocumentation=http://www.elastic.coAfter=elasticsearch.service[Service]Environment=LS_HOME=/var/lib/logstashEnvironment=LS_HE... 查看详情

elk-logstash

logstash简介:  logstash日志分析的配置和使用   logstash是一个数据分析软件,主要目的是分析log日志。整一套软件可以当作一个MVC模型,logstash是controller层,Elasticsearch是一个model层,kibana是view层。   ... 查看详情

了解一下logstash

  Logstash 是一个应用程序日志、事件的传输、处理、管理和搜索的平台。你可以用它来统一对应用程序日志进行收集管理,提供 Web 接口用于查询和统计。  Logstash配置要求  Logstash支持Java1.7版本以上。  ... 查看详情

logstash(代码片段)

logstash文档地址:https://www.elastic.co/guide/en/logstash/index.htmllogstash命令接收输入并输出logstash-e‘inputstdinoutputstdout‘其它命令--node.nameNAME-f,--path.configCONFIG_PATH-e,--config.stringCONFIG_STRING 查看详情

Logstash: NoMethodError: 方法 `>=' for nil:NilClass Logstash

】Logstash:NoMethodError:方法`>=\\\'fornil:NilClassLogstash【英文标题】:Logstash:NoMethodError:method`>=’fornil:NilClassLogstashLogstash:NoMethodError:方法`>=\'fornil:NilClassLogstash【发布时间】:2016-06-1314:37:05【问题描述】:这是我的配置文 查看详情

logstash上报数据到elasticsearch(代码片段)

logstash上报数据到elasticsearch前提是已经安装、部署完logstash和elasticsearchWindows安装启动logstash_zhangphil的博客-CSDN博客_logstashwindows安装Windows安装启动logstash(1)下载logstash,下载链接:DownloadLogstashFree|GetStartedN 查看详情

centos7安装logstash(代码片段)

 下载建议到官网下载最新版https://www.elastic.co/cn/downloads/logstash本文使用logstash7.0.0https://artifacts.elastic.co/downloads/logstash/logstash-7.0.0.tar.gzwgethttps://artifacts.elastic.co/downloads/logstash/ 查看详情

logstash上报数据到elasticsearch(代码片段)

logstash上报数据到elasticsearch前提是已经安装、部署完logstash和elasticsearchWindows安装启动logstash_zhangphil的博客-CSDN博客_logstashwindows安装Windows安装启动logstash(1)下载logstash,下载链接:DownloadLogstashFree|GetStartedN 查看详情

cron检测并启动logstash(代码片段)

背景线上的logstash总是莫名其妙的挂了,我打算写一个定时任务,一分钟去检查一次logstash进程,不存在时就把它启动步骤编写检测启动脚本让cron定时来调用检测启动脚本1、编写脚本第一次完成是这个样子:#!/usr/bin/envbashpid_blog=... 查看详情

logstash:运用logstash对serviceapi数据进行分析(代码片段)

我记得在之前的文章“Logstash:使用ELK堆栈进行API分析”运用Logstash对一些指标的API进行分析。在今天的练习中,我将展示如何使用Logstash来对一些日志类的ServiceAPI进行分析。我们知道在很多的时候,我们可以很快速... 查看详情

logstash:运用logstash对serviceapi数据进行分析(代码片段)

我记得在之前的文章“Logstash:使用ELK堆栈进行API分析”运用Logstash对一些指标的API进行分析。在今天的练习中,我将展示如何使用Logstash来对一些日志类的ServiceAPI进行分析。我们知道在很多的时候,我们可以很快速... 查看详情

elk实验安装logstash

logstash可以理解为log的采集传输组件老样子第一步下载sudowgethttps://artifacts.elastic.co/downloads/logstash/logstash-6.2.4.tar.gz解压出来sudotar-zxvflogstash-6.2.4.tar.gz编辑一下配置配置ip和日志记录的级别vi/config/logstash.ymlhttp.host:"192. 查看详情

使用filebeat和logstash集中归档日志

方案Filebeat->Logstash->FilesFilebeat->Redis->Logstash->FilesNxlog(Rsyslog、Logstash)->Kafka->Flink(Logstash->ES-Kibana)其他方案(可根据自己需求,选择合适的架构,作者选择了第二种方案)注释: 由于Logstash无法处理输出到文 查看详情