mysql数据同步到elasticsearch(代码片段)

autofelix autofelix     2023-02-05     574

关键词:

〝 古人学问遗无力,少壮功夫老始成 〞

要通过elasticsearch实现数据检索,首先要将mysql中的数据导入elasticsearch,并实现数据源与elasticsearch数据同步,这里使用的数据源是Mysql数据库,目前mysql与elasticsearch常用的同步机制大多是基于插件实现的。如果这篇文章能给你带来一点帮助,希望给飞兔小哥哥一键三连,表示支持,谢谢各位小伙伴们。

目录

一、同步远离

 二、logstash-input-jdbc

三、go-mysql-elasticsearch

四、elasticsearch-jdbc

五、logstash-input-jdbc实现同步

六、go-mysql-elasticsearch实现同步

七、elasticsearch-jdbc实现同步


一、同步远离

  • 基于Mysql的binlog日志订阅:binlog日志是Mysql用来记录数据实时的变化
  • Mysql数据同步到ES中分为两种,分别是全量同步增量同步
  • 全量同步表示第一次建立好ES索引之后,将Mysql中所有数据一次性导入到ES中
  • 增量同步表示Mysql中产生新的数据,这些新的数据包括三种情况,就是新插入Mysql中的数据,更新老的数据,删除的数据,这些数据的变动与新增都要同步到ES中

 二、logstash-input-jdbc

  • logstash官方插件,集成在logstash中,下载logstash即可,通过配置文件实现mysql与elasticsearch数据同步
  • 优点
  • 能实现mysql数据全量和增量的数据同步,且能实现定时同步
  • 版本更新迭代快,相对稳定
  • 作为ES固有插件logstash一部分,易用
  • 缺点
  • 不能实现同步删除操作,MySQL数据删除后Elasticsearch中数据仍存在
  • 同步最短时间差为一分钟,一分钟数据同步一次,无法做到实时同步

三、go-mysql-elasticsearch

  • go-mysql-elasticsearch 是国内作者开发的一款插件
  • 优点
  • 能实现mysql数据全量和增量的数据同步
  • 缺点
  • 无法实现数据全量同步Elasticsearch
  • 仍处理开发、相对不稳定阶段

四、elasticsearch-jdbc

  • elasticsearch-jdbc 目前最新的版本是2.3.4,支持的ElasticSearch的版本为2.3.4, 未实践
  • 优点
  • 能实现mysql数据全量和增量的数据同步
  • 缺点
  • 目前最新的版本是2.3.4,支持的ElasticSearch的版本为2.3.4
  • 不能实现同步删除操作,MySQL数据删除后Elasticsearch中数据仍存在

五、logstash-input-jdbc实现同步

  • 第一步安装:
  • logstash5.x之后,集成了logstash-input-jdbc插件。安装logstash后通过命令安装logstash-input-jdbc插件
cd /logstash-6.4.2/bin
./logstash-plugin install logstash-input-jdbc
  • 第二步配置:
  • 在logstash-6.4.2/config文件夹下新建jdbc.conf,配置如下
  • 在logstash-6.4.2/config 目录下新建jdbc.sql文件
select * from t_employee
  • 第三步运行
cd logstash-6.4.2
# 检查配置文件语法是否正确
bin/logstash -f config/jdbc.conf --config.test_and_exit
# 启动
bin/logstash -f config/jdbc.conf --config.reload.automatic
  • --config.reload.automatic:会自动重新加载配置文件内容

  • 在kibana中创建索引后查看同步数据

PUT octopus
GET octopus/_search

六、go-mysql-elasticsearch实现同步

  • 第一步:mysql binlog日志
  • go-mysql-elasticsearch通过mysql中binlog日志实现数据增加,删除,修改同步elasticsearch
  • mysql的binlog日志主要用于数据库的主从复制与数据恢复。binlog中记录了数据的增删改查操作,主从复制过程中,主库向从库同步binlog日志,从库对binlog日志中的事件进行重放,从而实现主从同步。
  • mysql binlog日志有三种模式,分别为:
ROW:   记录每一行数据被修改的情况,但是日志量太大
STATEMENT:   记录每一条修改数据的SQL语句,减少了日志量,但是SQL语句使用函数或触发器时容易出现主从不一致
MIXED:   结合了ROW和STATEMENT的优点,根据具体执行数据操作的SQL语句选择使用ROW或者STATEMENT记录日志
  • 要通过mysql binlog将数据同步到ES集群,只能使用ROW模式,因为只有ROW模式才能知道mysql中的数据的修改内容。

  • 以UPDATE操作为例,ROW模式的binlog日志内容示例如下:

SET TIMESTAMP=1527917394/*!*/;
    BEGIN
    /*!*/;
    # at 3751
    #180602 13:29:54 server id 1  end_log_pos 3819 CRC32 0x8dabdf01     Table_map: `webservice`.`building` mapped to number 74
    # at 3819
    #180602 13:29:54 server id 1  end_log_pos 3949 CRC32 0x59a8ed85     Update_rows: table id 74 flags: STMT_END_F
    
    BINLOG '
    UisSWxMBAAAARAAAAOsOAAAAAEoAAAAAAAEACndlYnNlcnZpY2UACGJ1aWxkaW5nAAYIDwEPEREG
    wACAAQAAAAHfq40=
    UisSWx8BAAAAggAAAG0PAAAAAEoAAAAAAAEAAgAG///A1gcAAAAAAAALYnVpbGRpbmctMTAADwB3
    UkRNbjNLYlV5d1k3ajVbD64WWw+uFsDWBwAAAAAAAAtidWlsZGluZy0xMAEPAHdSRE1uM0tiVXl3
    WTdqNVsPrhZbD64Whe2oWQ==
    '/*!*/;
    ### UPDATE `webservice`.`building`
    ### WHERE
    ###   @1=2006 /* LONGINT meta=0 nullable=0 is_null=0 */
    ###   @2='building-10' /* VARSTRING(192) meta=192 nullable=0 is_null=0 */
    ###   @3=0 /* TINYINT meta=0 nullable=0 is_null=0 */
    ###   @4='wRDMn3KbUywY7j5' /* VARSTRING(384) meta=384 nullable=0 is_null=0 */
    ###   @5=1527754262 /* TIMESTAMP(0) meta=0 nullable=0 is_null=0 */
    ###   @6=1527754262 /* TIMESTAMP(0) meta=0 nullable=0 is_null=0 */
    ### SET
    ###   @1=2006 /* LONGINT meta=0 nullable=0 is_null=0 */
    ###   @2='building-10' /* VARSTRING(192) meta=192 nullable=0 is_null=0 */
    ###   @3=1 /* TINYINT meta=0 nullable=0 is_null=0 */
    ###   @4='wRDMn3KbUywY7j5' /* VARSTRING(384) meta=384 nullable=0 is_null=0 */
    ###   @5=1527754262 /* TIMESTAMP(0) meta=0 nullable=0 is_null=0 */
    ###   @6=1527754262 /* TIMESTAMP(0) meta=0 nullable=0 is_null=0 */
    # at 3949
    #180602 13:29:54 server id 1  end_log_pos 3980 CRC32 0x58226b8f     Xid = 182
    COMMIT/*!*/;
  • STATEMENT模式下binlog日志内容示例为:
SET TIMESTAMP=1527919329/*!*/;
    update building set Status=1 where Id=2000
    /*!*/;
    # at 688
    #180602 14:02:09 server id 1  end_log_pos 719 CRC32 0x4c550a7d  Xid = 200
    COMMIT/*!*/;
  • 从ROW模式和STATEMENT模式下UPDATE操作的日志内容可以看出,ROW模式完整地记录了要修改的某行数据更新前的所有字段的值以及更改后所有字段的值,而STATEMENT模式只单单记录了UPDATE操作的SQL语句。我们要将mysql的数据实时同步到ES, 只能选择ROW模式的binlog, 获取并解析binlog日志的数据内容,执行ES document api,将数据同步到ES集群中。

  • 查看,修改binlog模式

# 查看binlog模式
mysql> show variables like "%binlog_format%";

# 修改binlog模式
mysql> set global binlog_format='ROW';

# 查看binlog是否开启
mysql> show variables like 'log_bin';

# 开启bīnlog
修改my.cnf文件log-bin = mysql-bin
  • 第二步安装
# 安装go
sudo apt-get install go

# 安装godep
go get github.com/tools/godep

# 获取go-mysql-elasticsearch插件
go get github.com/siddontang/go-mysql-elasticsearch

# 安装go-mysql-elasticsearch插件
cd go/src/github.com/siddontang/go-mysql-elasticsearch
make
  • 第三步配置
  • go/src/github.com/siddontang/go-mysql-elasticsearch/etc/river.toml
# MySQL address, user and password
# user must have replication privilege in MySQL.
my_addr = "127.0.0.1:3306"     # 需要同步的mysql基本设置
my_user = "root"
my_pass = "root"

# Elasticsearch address
es_addr = "127.0.0.1:9200"     # 本地elasticsearch配置

# Path to store data, like master.info, and dump MySQL data 
data_dir = "./var"             # 数据存储的url
# 以下配置保存默认不变
# Inner Http status address
stat_addr = "127.0.0.1:12800"

# pseudo server id like a slave 
server_id = 1001

# mysql or mariadb
flavor = "mysql"
# mysqldump execution path
mysqldump = "mysqldump"

# MySQL data source
[[source]]
schema = "test"             //elasticsearch 与 mysql 同步时对应的数据库名称

# Only below tables will be synced into Elasticsearch.
# 要同步test这个database里面的几张表。对于一些项目如果使用了分表机制,我们可以用通配符来匹配,譬如t_[0-9]4,就可# 以匹配 table  t_0000 到 t_9999。
tables = ["t", "t_[0-9]4", "tfield", "tfilter"]  

# Below is for special rule mapping
# 对一个 table,我们需要指定将它的数据同步到 ES 的哪一个 index 的 type 里面。如果不指定,我们默认会用起 schema  # name 作为 ES 的 index 和 type
[[rule]]
schema = "test"    //数据库名称
table = "t"        //表名称
index = "test"        //对应的索引名称
type = "t"            //对应的类型名称

# 将所有满足格式 t_[0-9]4 的 table 同步到 ES 的 index 为 test,type 为 t 的下面。当然,这些表需要保证
# schema 是一致的
[[rule]]
schema = "test"
table = "t_[0-9]4"
index = "test"
type = "t"

# 对于 table tfilter,我们只会同步 id 和 name 这两列,其他的都不会同步
filter = ["id", "name"]
# table tfield 的 column id ,我们映射成了 es_id,而 tags 则映射成了 es_tags
# list 这个字段,他显示的告知需要将对应的 column 数据转成 ES 的 array type。这个现在通常用于 MySQL 的 varchar # 等类型,我们可能会存放类似 “a,b,c” 这样的数据,然后希望同步给 ES 的时候变成 [a, b, c] 这样的列表形式。

[rule.field]
# Map column `id` to ES field `es_id`
id="es_id"
# Map column `tags` to ES field `es_tags` with array type 
tags="es_tags,list"
# Map column `keywords` to ES with array type
keywords=",list"
  • 第四步运行 
cd go/src/github.com/siddontang/go-mysql-elasticsearch
bin/go-mysql-elasticsearch -config=./etc/river.toml

七、elasticsearch-jdbc实现同步

[root@autofelix /]# vi /etc/profile
export JDBC_IMPORTER_HOME=/elasticsearch-jdbc-2.3.2.0
  • 使环境变量生效
[root@autofelix /]# source /etc/profile
[root@autofelix /]# ll /odbc_es/
drwxr-xr-x 2 root root 4096 Jun 16 03:11 logs
-rwxrwxrwx 1 root root 542 Jun 16 04:03 mysql_import_es.sh
  • 第二步:新建脚本mysql_import_es.sh,内容如下
[root@autofelix odbc_es]# cat mysql_import_es.sh
’#!/bin/sh
bin=$JDBC_IMPORTER_HOME/bin
lib=$JDBC_IMPORTER_HOME/lib
echo '
"type" : "jdbc",
"jdbc": 
"elasticsearch.autodiscover":true,
"elasticsearch.cluster":"my-application", #簇名,详见:/usr/local/elasticsearch/config/elasticsearch.yml
"url":"jdbc:mysql://10.8.5.101:3306/test", #mysql数据库地址
"user":"root", #mysql用户名
"password":"123456", #mysql密码
"sql":"select * from cc",
"elasticsearch" : 
  "host" : "10.8.5.101",
  "port" : 9300
,
"index" : "myindex", #新的index
"type" : "mytype" #新的type

'| java \\
  -cp "$lib/*" \\
  -Dlog4j.configurationFile=$bin/log4j2.xml \\
  org.xbib.tools.Runner \\
  org.xbib.tools.JDBCImporter
  • 第三步:为 mysql_import_es.sh 添加可执行权限。
[root@autofelix odbc_es]# chmod a+x mysql_import_es.sh
  • 第四步:执行脚本mysql_import_es.sh
[root@autofelix odbc_es]# ./mysql_import_es.sh

mysql数据同步到elasticsearch(代码片段)

〝古人学问遗无力,少壮功夫老始成〞要通过elasticsearch实现数据检索,首先要将mysql中的数据导入elasticsearch,并实现数据源与elasticsearch数据同步,这里使用的数据源是Mysql数据库,目前mysql与elasticsearch常用的... 查看详情

mysql数据同步到elasticsearch(代码片段)

〝古人学问遗无力,少壮功夫老始成〞要通过elasticsearch实现数据检索,首先要将mysql中的数据导入elasticsearch,并实现数据源与elasticsearch数据同步,这里使用的数据源是Mysql数据库,目前mysql与elasticsearch常用的... 查看详情

elasticsearch推荐一个同步mysql数据到elasticsearch的工具

1.概述转载:https://elasticsearch.cn/article/756 查看详情

logstash同步mysql数据到elasticsearch(代码片段)

目录1MySql数据到Elasticsearch1.1下载logstash1.2解压logstash1.3在logstash目录创建mysql文件夹1.4将mysql驱动文件和数据库查询文件放进mysql中1.5在config目录下创建mysqltoes.conf文件1.6mysqltoes.conf配置1.7启动logstash2配置语法讲解3启动方式4filebeat基... 查看详情

20.elasticsearch-jdbc实现mysql同步到elasticsearch(es与关系型数据库同步)

1.如何实现mysql与elasticsearch的数据同步?逐条转换为json显然不合适,需要借助第三方工具或者自己实现。核心功能点:同步增、删、改、查同步。2、mysql与elasticsearch同步的方法有哪些?优缺点对比?目前该领... 查看详情

mysql准实时同步数据到elasticsearch(代码片段)

...出是ES,so相应的插件应该是logstash-input-jdbc和logstash-output-elasticsearch。安装插件的命令分别是(在Logs 查看详情

mysql到elasticsearch实时数据同步实操分享

...TapdataCloud,可以非常方便地完成MySQL数据实时同步到Elasticsearch,跟大家分享一下,希望对你有帮助。本次MySQL数据实时同步到Elasticsearch大概只花了几分钟就完成。使用的工具是TapdataCloud,这个工具是永久免费的。M... 查看详情

mysql到elasticsearch实时数据同步实操分享

...TapdataCloud,可以非常方便地完成MySQL数据实时同步到Elasticsearch,跟大家分享一下,希望对你有帮助。本次MySQL数据实时同步到Elasticsearch大概只花了几分钟就完成。使用的工具是TapdataCloud,这个工具是永久免费的。M... 查看详情

[es和mysql数据库同步]推荐一个同步mysql数据到elasticsearch的工具

...18-08-1416:27MCTW回复zqc0512_(ω」∠)_没办法啊同学。曾经用过elasticsearch-jdbc,不是很能满足需求啊。如果字段存的是竖线分隔的标签:"金融|大数据|工作平台",希望传到es变成字符串数组["金融","大数据","工作平台"],这种轮子该怎... 查看详情

canal实时同步mysql数据到elasticsearch(部署,配置,测试)(代码片段)

...改配置创建从库权限账号创建测试数据库创建测试数据表elasticsearch配置创建索引建立映射canal的下载部署下载canal配置服务端canal-deployer配置客户端canal-adaptercanal-adapter启动报错问题同步测试建立es索引和mysql表的映射插入mysql数据... 查看详情

elasticsearch-jdbc实现mysql同步到elasticsearch深入详解(代码片段)

Elasticsearch最少必要知识实战教程直播回放1.如何实现mysql与elasticsearch的数据同步?逐条转换为json显然不合适,需要借助第三方工具或者自己实现。核心功能点:同步增、删、改、查同步。2、mysql与elasticsearch同步的方... 查看详情

mysql数据同步至elasticsearch的相关实现方法

Python: MySQL数据同步到ES集群(MySQL数据库与ElasticSearch全文检索的同步)通过logstash将mysql数据同步至es中Springboot+ElasticSearch构建博客检索系统-学习笔记01Springboot+ElasticSearch构建博客检索系统-学习笔记02P43 43.新闻案例-数据... 查看详情

elasticsearch与mysql数据同步(代码片段)

...送MQ消息4.1事务配置类4.2service代码5.接收MQ消息数据同步elasticsearch中的酒店数据来自于mysql数据库,因此mysql数据发生改变时,elasticsearch也必须跟着改变,这个就是elasticsearch与mysql之间的数据同步。一.思路分析常见的... 查看详情

elasticsearch学习笔记-p8(实现elasticsearch与mysql的数据同步)(代码片段)

文章目录MQ实现elasticsearch与mysql的数据同步1.思路分析1.1同步调用1.2异步通知1.3监听binlog1.4选择2.MQ实现数据同步2.1思路2.2声明交换机、队列(1)引入依赖(2)声明队列交换机名称(3)声明队列交换机2.4发... 查看详情

elasticsearch实战(四十二)-数据离线同步技术选型

...平时工作中,需要把存储在三方存储系统中的数据同步到ElasticSearch中,比如Mysql/PostgreSQL/Cassandra/HBase 将数据离线同步到ElasticSearch中,他们中间的数据传输需要通过三方中间件,这边数据离线同步有以下两种方案:  &n... 查看详情

elasticsearch实战(四十二)-数据离线同步技术选型

...平时工作中,需要把存储在三方存储系统中的数据同步到ElasticSearch中,比如Mysql/PostgreSQL/Cassandra/HBase 将数据离线同步到ElasticSearch中,他们中间的数据传输需要通过三方中间件,这边数据离线同步有以下两种方案:  &n... 查看详情

docker搭建elasticsearchkibanalogstash同步mysql数据到es(代码片段)

一、前言在数据量大的企业级实践中,Elasticsearch显得非常常见,特别是数据表超过千万级后,无论怎么优化,还是有点力不从心!使用中,最首先的问题就是怎么把千万级数据同步到Elasticsearch中,在一些开源框架中知道了,有... 查看详情

elasticsearch同步mysql

ElasticSearch同步Mysql的插件选择了elasticsearch-jdbc,理由是活跃度高,持续更新,最新版本兼容elasticsearch-2.3.3.一、下载下载地址:https://github.com/jprante/elasticsearch-jdbc下载后解压,里面有bin、lib2个目录.二、mysql配置确保mysql能用,在mysql... 查看详情