logstash同步mysql数据到elasticsearch(代码片段)

赵广陆 赵广陆     2023-01-30     321

关键词:


1 MySql数据到Elasticsearch

1.1 下载logstash


官网
https://www.elastic.co/cn/logstash/

wget https://artifacts.elastic.co/downloads/logstash/logstash-6.6.0.tar.gz

1.2 解压logstash

tar -zxvf logstash-6.6.0.tar.gz

1.3 在logstash 目录创建 mysql 文件夹

[root@bigdata01 logstash-6.6.0]# mkdir mysql

1.4 将 mysql 驱动文件和数据库查询文件 放进mysql中

1.5 在config 目录下创建 mysqltoes.conf 文件

1.6 mysqltoes.conf 配置

input 
  
  # 多张表的同步只需要设置多个jdbc的模块就行了
  jdbc 
      # mysql 数据库链接,shop为数据库名
      jdbc_connection_string => "jdbc:mysql://ip:3306/mall?characterEncoding=utf8&useSSL=false&serverTimezone=UTC&rewriteBatchedStatements=true"
      # 用户名和密码
      jdbc_user => "root"
      jdbc_password => "root"

      # 驱动
      jdbc_driver_library => "/usr/local/logstash-6.6.0/mysql/mysql-connector-java-8.0.16.jar"

      # 驱动类名
      jdbc_driver_class => "com.mysql.jdbc.Driver"

      #是否分页
      jdbc_paging_enabled => "true"
      jdbc_page_size => "50000"

      #直接执行sql语句
      # statement =>"select * from t_item"
      # 执行的sql 文件路径+名称
      statement_filepath => "/usr/local/logstash-6.6.0/mysql/item.sql"
      
      # 默认列名转换为小写
      lowercase_column_names => "false"

      #设置监听间隔  各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
      schedule => "* * * * *"

      # 索引类型
      #type => "jdbc"
    




output 
  elasticsearch 
        #es的ip和端口
        hosts => ["http://ip:9200"]
        #ES索引名称(自己定义的)
            index => "mall"
        #文档类型
        document_type => "mall_item"
        #设置数据的id为数据库中的字段
        document_id => "%iteId"
    
    stdout 
        codec => json_lines
    


1.7 启动 logstash

前台启动:

[root@bigdata01 bin]# ./logstash -f ../config/mysqltoes.conf

后台启动:

[root@bigdata01 bin]# nohup  ./logstash -f ../config/mysqltoes.conf >logstash.log &

2 配置语法讲解

logstash使用 来定义配置区域,区域内又可以包含其插件的区域配置。

# 最基本的配置文件定义,必须包含input 和 output。

input
    stdin 



output
    stdout
        codec=>rubydebug
    



# 如果需要对数据进操作,则需要加上filter段

input
    stdin 



filter
  # 里面可以包含各种数据处理的插件,如文本格式处理 grok、键值定义 kv、字段添加、
  # geoip 获取地理位置信息等等... 



output
    stdout
        codec=>rubydebug
    



# 可以定义多个输入源与多个输出位置

input
    stdin 
    
    file
        path => ["/var/log/message"]
        type => "system"
        start_position => "beginning"
    



output
    stdout
        codec=>rubydebug
    
    
     
    file 
        path => "/var/datalog/mysystem.log.gz"
        gzip => true
    
    

3 启动方式

# 通过手动指定配置文件启动

/bin/logstash -f /etc/logstash/conf.d/nginx_logstash.conf

# 以daemon方式运行,则在指令后面加一个 & 符号

/bin/logstash -f /etc/logstash/conf.d/nginx_logstash.conf &


# 如果是通过rpm包安装的logstash则可以使用自带的脚本启动

/etc/init.d/logstash start 

# 通过这种方式启动,logstash会自动加载 /etc/logstash/conf.d/ 下的配置文件

4 filebeat基本讲解

filebeat是基于原先 logstash-forwarder 的源码开发而来,无需JAVA环境,运行起来更轻便,无疑是业务服务器端的日志收集工具。

配 置

# 配置文件路径 "/etc/filebeat/filebeat.yml"
# 一个配置文件可以包含多个prospectors,一个prospectors可以包含多个path。


filebeat:
  # List of prospectors to fetch data.
  prospectors:
    # Each - is a prospector. Below are the prospector specific configurations
    -
      paths:
        - /var/log/messages
        
      input_type: log

      document_type: messages

    -
      paths:
        - /alidata/log/nginx/access/access.log.json

      input_type: log

      document_type: nginxacclog

    -
      paths:
        - /alidata/www/storage/logs/laravel.log

      input_type: log

      document_type: larlog

    -
      paths:
        - /alidata/www/500_error/500_error.log

      input_type: log

      document_type: error500

    -
      paths:
        - /alidata/www/deposit/deposit.log

      input_type: log

      document_type: deposit


    -
      paths:
        - /alidata/www/call_error.log

      input_type: log

      document_type: call_error

    -
      paths:
        - /alidata/www/weixin_deposit.log

      input_type: log

      document_type: weixin_deposit


    -
      paths:
        - /alidata/log/php/php-fpm.log.slow

      input_type: log

      document_type: phpslowlog

      # 多行处理
      multiline:
          pattern: '^[[:space:]]'
          negate: true
          match: after


    # Additional prospector

registry_file: /var/lib/filebeat/registry


############################# Libbeat Config ##################################
# Base config file used by all other beats for using libbeat features

############################# Output ##########################################

# 输出数据到 redis 

output:


  redis:

    host: "10.122.52.129"
    port: 6379
    password: "123456"


# 输出数据到 logstash ,一般两者选用其一
 
  logstash:
    hosts: ["10.160.8.221:5044"]


############################# Shipper #########################################

shipper:

# 打上服务器tag

  name: "host_2"
  
  
############################# Logging #########################################  
  
logging:  
  
  
  files:
  

    rotateeverybytes: 10485760 # = 10MB

filebeat主要配置就是这个配置文件了,设定好之后启动服务就会自动从源拉取数据发送到指定位置,当数据为普通行数据时,filebeat会自动为其添加字段信息,其中一项字段 @timestamp 为filebeat读取到这条数据的时间,默认格式为UTC时间,比中国大陆时间早8小时。

如果数据为json格式,而数据中已包含@timestamp字段,filebeat处理时会把@timestamp字段值替换为filebeat读取到该行数据的当前UTC时间。

5 实战运用

5.1 业务到redis到es之间迁移

nginx 日志格式配置

       log_format json '"@timestamp":"$time_iso8601",'
                 '"slbip":"$remote_addr",'
                 '"clientip":"$http_x_forwarded_for",'
                 '"serverip":"$server_addr",'
                 '"size":$body_bytes_sent,'
                 '"responsetime":$request_time,'
                 '"domain":"$host",'
                 '"method":"$request_method",'
                 '"requesturi":"$request_uri",'
                 '"url":"$uri",'
                 '"appversion":"$HTTP_APP_VERSION",'
                 '"referer":"$http_referer",'
                 '"agent":"$http_user_agent",'
                 '"status":"$status"';

filebeat 配置

filebeat:
  # List of prospectors to fetch data.
  prospectors:
    # Each - is a prospector. Below are the prospector specific configurations

    -
      paths:
        - /alidata/log/nginx/access/access.log.json

      input_type: log

      document_type: nginxacclog
      

############################# Output ##########################################

output:            
 
  logstash:
    hosts: ["10.160.8.221:5044"]

# 其他部分配置省略。

logstash 配置 (此处logstash用于接收filebeat的数据,然后转存redis)

input 
    beats 
    port => 5044
    codec => "json"





filter 
    if [type] == "nginxacclog" 
    geoip 
        source => "clientip"
        target => "geoip"
        database => "/u01/elk/logstash/GeoLiteCity.dat"
        add_field => [ "[geoip][coordinates]","%[geoip][longitude]" ]
        add_field => [ "[geoip][coordinates]","%[geoip][latitude]" ]


    mutate 
        convert => [ "[geoip][coordinates]","float" ]








output
    if [type] == "nginxacclog" 
    redis 
        data_type => "list"
        key => "nginxacclog"
        host => "127.0.0.1"
        port => "26379"
        password => "123456"
        db => "0"





    if [type] == "messages" 
    redis 
        data_type => "list"
        key => "messages"
        host => "127.0.0.1"
        port => "26379"
        password => "123456"
        db => "0"






logstash 配置 (此处logstash用于读取redis list中的数据,然后转存elasticsearch)

input
    redis 
        host => "10.10.1.2"
        port => "26379"
        db => "0"
        key => "nginxacclog"
        threads => 300
        password => "123456"
        data_type => "list"
        codec => "json"



    redis 
        host => "10.10.1.2"
        port => "26379"
        db => "0"
        key => "messages"
        password => "123456"
        threads => 50
        data_type => "list"
        codec => "json"






output 
    if [type] == "nginxacclog" 
        elasticsearch 
            hosts => ["127.0.0.1:9200"]
            index => "logstash-nginxacclog-%+YYYY.MM.dd"
            manage_template => true
            flush_size => 50000
            idle_flush_time => 10
            workers => 2





    if [type] == "messages" 
        elasticsearch 
            hosts => ["127.0.0.1:9200"]
            index => "logstash-messages-%+YYYY.MM.dd"
            manage_template => true
            flush_size => 50000
            idle_flush_time => 30
            workers => 1





关键指令解释:

threads 开启多少个线程读取redis数据,也就是从redis输入到logstash的速度,线程越多读取速度越快,但是根据接收节点的接收速度来设置,如果输入过快,接收速度不够,则会出现丢数据的情况,设置一个最佳的threads值需要和接收节点做反复测试才能得出。

flush_size 控制logstash向Elasticsearch批量发送数据,上面的配置表示,logstash会努力赞到50000条数据一次发送给Elasticsearch。

idle_flush_time 控制logstash多长时间向Elasticsearch发送一次数据,默认为1秒,根据以上配置,logstash积攒数据未到flush_size 10秒后也会向Elasticsearch发送一次数据。

workers 建议设置为1或2,如果机器性能不错可以设置为2. 不建议设置的更高。

5.2 业务到redis到mongo

filebeat 配置(从日志文件读取到的数据直接缓存至redis队列)

filebeat:
  # List of prospectors to fetch data.
  prospectors:
    # Each - is a prospector. Below are the prospector specific configurations

    -
      paths:
        - /alidata/log/nginx/access/access.log.json

      input_type: log

      document_type: nginxacclog


############################# Output ##########################################

output:


  redis:

    host: "10.160.8.221"
    port: 26379
    password: "123456"

document_type 自定义日志类型,在logstash中可通过type判断做不同的处理。

logstash 配置 (此处logstash用于读取redis list中的数据,然后转存mongodb)

input 

        redis 
            host => "10.160.8.221"
            port => "26379"
            key => "filebeat"
            data_type => "list"
            password => "123456"
            threads => 50
         
    

        redis 
            host => "10.160.8.221"
            port => "26379"
            key => "mycat"
            data_type => "list"
            password => "123456"
            threads => 50
            type => "mycat"
        
        



output 

if [type] == "mycat" 
        mongodb
            collection => "mycat%+yyyyMMdd"
            isodate => true
            database => "logdb"
            uri => "mongodb://log_user:123456@10.10.1.102:27017/logdb"
        


if [type_xi09wnk] == "nginxacclog" 
        mongodb
            collection => "nginx_accress%years_dik3k%months_dik3k%days_dik3k"
            isodate => true
            database => "logdb"
            uri => "mongodb://log_user:123456@10.10.1.102:27017/logdb"

        



logstash同步mysql数据到elasticsearch(代码片段)

目录1MySql数据到Elasticsearch1.1下载logstash1.2解压logstash1.3在logstash目录创建mysql文件夹1.4将mysql驱动文件和数据库查询文件放进mysql中1.5在config目录下创建mysqltoes.conf文件1.6mysqltoes.conf配置1.7启动logstash2配置语法讲解3启动方式4filebeat基... 查看详情

mysql同步数据到es(代码片段)

mysql同步数据到es常用两种方式1.使用logstash如果是历史数据同步我们可以用logstash,最快同步频率每分钟一次,如果对时效性要求高,慎用2.使用canal实时同步,本文章未演示使用logstash进行同步logstash特性:无需开发,仅需安装配置lo... 查看详情

elasticsearth同步数据库logstash《springboot集成elasticsearch-四》

参考技术Ahttps://artifacts.elastic.co/downloads/logstash/logstash-7.9.1.tar.gzjdbc驱动可以在maven仓库里复制出来另外两个文件是新建的mysql.sql随便写个select语句mysql.config./logstash-f../mysql/mysql.conflogstash只支持增量相关问题请查看https://blog.csdn.net/l... 查看详情

eslogstash数据同步入门

参考技术A官网地址:https://www.elastic.co/cn/logstashLogstash是一个功能强大的工具,可与各种部署集成。它提供了大量插件,可帮助你解析,丰富,转换和缓冲来自各种来源的数据。如果你的数据需要Beats中没有的其他处理,则需要将... 查看详情

mysql准实时同步数据到elasticsearch(代码片段)

4.安装JDK8、MySQL5.6驱动以及Logstash-6.0.0ECS中分别安装JDK8、MySQL5.6驱动以及Logstash-6.0.0。如下图:安装Logstashinput、output插件,此案例数据输入是MySQL,输出是ES,so相应的插件应该是logstash-input-jdbc和logstash-output-elasticsearch。安装插件... 查看详情

logstash(代码片段)

简略介绍:Logstash是elastic技术栈中的一员。它是一个数据采集引擎,可以从数据库采集数据到es中。我们可以通过设置自增id主键或者时间来控制数据的自动同步,这个id或者时间就是用于给logstash进行识别哪些数据需要同步的。... 查看详情

logstash(代码片段)

简略介绍:Logstash是elastic技术栈中的一员。它是一个数据采集引擎,可以从数据库采集数据到es中。我们可以通过设置自增id主键或者时间来控制数据的自动同步,这个id或者时间就是用于给logstash进行识别哪些数据需要同步的。... 查看详情

架构师成长记_第八周_22_logstash数据同步(代码片段)

文章目录1.logstash简介PS:2.logstash安装2.1(前提是安装好jdk)上传logstash,mysql驱动2.2logstash同步配置2.3启动logstashPS:1.logstash简介Logstash是一个开源数据收集引擎,具有实时管道功能。Logstash可以动态地将来自不同数据源的数据统一起... 查看详情

elasticsearch与mysql数据同步多个表(logstash)

参考技术Aproducts索引字段展示categorys索引字段展示配置文件内容展示(同时同步products和categorys)attribute索引字段展示products索引字段展示需要给两个txt文件相应的权限,详见单表操作同步后的products索引数据 查看详情

通过logstash全量和增量同步mysql一对多关系到elasticsearch(代码片段)

...xff0c;难免会使用到Elasticsearch做搜索。文章描述从Mysql通过Logstash实时同步到Elasticsearch,下面就开始来进行实现吧!具体的Elasticsearch+Logstash+kibana搭建, 查看详情

mysql数据同步到elasticsearch(代码片段)

...支持,谢谢各位小伙伴们。目录一、同步远离 二、logstash-input-jdbc三、go-mysql-elasticsearch四、elasticsearch-jdbc五、logstash-input-jdbc实现同步六、go-mysql-elasticsearch实现同步七、elasticsearch-jdbc实现同步一、同步远离基于Mysql的binlog日... 查看详情

mysql数据同步到elasticsearch(代码片段)

...支持,谢谢各位小伙伴们。目录一、同步原理 二、logstash-input-jdbc三、go-mysql-elasticsearch四、elasticsearch-jdbc五、logstash-input-jdbc实现同步六、go-mysql-elasticsearch实现同步七、elasticsearch-jdbc实现同步一、同步原理基于Mysql的binlog日... 查看详情

架构师成长记_第八周_22_logstash数据同步(代码片段)

文章目录1.logstash简介PS:2.logstash安装2.1(前提是安装好jdk)上传logstash,mysql驱动2.2logstash同步配置2.3启动logstashPS:1.logstash简介Logstash是一个开源数据收集引擎,具有实时管道功能。Logstash可以动态地将来自不同数据源的数据统一起... 查看详情

[es和mysql数据库同步]推荐一个同步mysql数据到elasticsearch的工具

...已有的轮子能轻易完成这个任务呢。2018-08-1417:36freedomcy007logstash不是完美支持吗?2018-08-1418:02MCTW回复freedomcy007对logstash了解的不深..它对于旧数据的更改、删除能增量同步嘛?能把"aa|bb|ccc"转成["aa","bb","ccc"]同步到es嘛?如果可以... 查看详情

elk——logstash(代码片段)

Logstash是开源的服务器端数据处理管道,能够同时从多个来源采集数据,转换数据,然后将数据发送到您最喜欢的“存储库”中。官网介绍:https://www.elastic.co/cn/products/logstashhttps://www.elastic.co/downloads/logstash1、下载Logstash依赖JDK1.8... 查看详情

logstash将日志写入数据库mysql

#安装logstshcd/usr/local/wgethttps://artifacts.elastic.co/downloads/logstash/logstash-8.6.2-linux-x86_64.tar.gztar-xzvflogstash-8.6.2-linux-x86_64.tar.gzln-slogstash-8.6.2-linux-x86_64logstashexportPATH=$PATH:/usr/local/logstash/jdk/bin>>/etc/profilesource/etc/profile#安装logstash插件/us... 查看详情

elasticsearch数据库同步插件logstash

   1.下载和elasticsearch相同版本的logstash.  2.进行解压后,进入bin下,新建一个文件mysql.conf,并输入inputstdinoutputstdout3.cmd进入bin下,输入logstash-fmysql.conf 启动后,输入  http://127.0.0.1:9600/4.ok;说明已经启动了   查看详情

使用logstash和jdbc确保elasticsearch与关系型数据库保持同步

...数据保持同步。因此,在本篇博文中,我会演示如何使用Logstash来高效地复制数据并将关系型数据库中的更新同步到Elasticsearch中。本文中所列出的代码和方法已使用MySQL进行过测试 查看详情