系统重构数据同步利器之canal实战篇(代码片段)

浅谈架构 浅谈架构     2022-11-28     660

关键词:

一、背景

二话不说,先上图

上图来自于官网(https://github.com/alibaba/canal),基本上涵盖了目前生产环境使用场景了,众所周知,Canal做数据同步已经是行业内标杆了。我们生产环境也用Canal监听binlog数据变更,然后解析成对应数据发送到MQ(RocketMQ)。一些非主流程业务,异步场景消费MQ处理即可。

但是我这篇文章,主要想聊一聊在做系统重构时,新老系统数据双向同步时Canal的使用场景。

注:关于系统重构的介绍我这里就不叙述了,大家可以看我之前写的系列文章:浅谈系统重构

二、关于双向同步
  1. 什么是双向同步?

所谓双向同步,就是老系统数据库数据往新系统数据库同步,新系统数据库同时也往老系统数据库同步。从而保证新系统,老系统数据库数据完全一致。系统重构时如果上线出现问题,随时能切回原来老系统,这也为灰度方案提供了底层保障。

  1. 一般同步如何做?各自优缺点是什么?

方案一: Dao层拦截方案

方案说明: 在Dao层打洞拦截所有写请求(insert,update,delete), 然后写入MQ队列,再通过消费MQ队列写入对应数据库。

优点: 这种方案实现比较简单。

缺点: 对于老系统数据库,可能有很多个服务在写入,如果从Dao层拦截,可能要修改很多地方,改动较大。

方案二: 利用Canal订阅解析Binlog

方案说明: 利用Canal订阅Binlog,解析成数据,再写入到对应数据库(这里可以直接写入,也可以先写入MQ,再消费MQ写入,推荐后者)。

优点:能够解决系统多处写入问题。

缺点:引入新的组件Canal,复杂度增加。

下面,我们就来实战操作一下方案二。

三、环境准备(Centos系统为例)1. 安装Mysql
wget https://dev.mysql.com/get/mysql80-community-release-el8-1.noarch.rpm
yum install  mysql80-community-release-el8-1.noarch.rpm

#
禁用centos自带的mysql
yum module disable mysql -y
#安装
yum install mysql-community-server -y
#启动
systemctl start mysqld
#查看启动状态 提升 Active: active (running) 表示成功
systemctl status mysqld
#查看初始密码
grep \'temporary password\' /var/log/mysqld.log
#初始密码登录
mysql -uroot -p\'AXXXXX\'  -hlocalhost -P3306
#修改ROOT密码
ALTER USER \'root\'@\'localhost\' IDENTIFIED BY \'BXXXXX\';

2、 环境部署

1)、查看当前mysql是否开启了binlog模式, 如果log_bin的值为OFF是未开启,为ON是已开启 。

SHOW VARIABLES LIKE \'%log_bin%\'

2)、若未开启需要修改/ect/my.cnf 开启binlog模式

[mysqld]
log-bin=mysql-bin
binlog-format=ROW
server_id=1

修改完之后重启mysql服务

3)、创建用户并且授权

create user canal@\'%\' IDENTIFIED by \'XXXX\';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO \'canal\'@\'%\';
FLUSH PRIVILEGES;

3、 Canal服务端安装

1)、canal下载地址

wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz

2)、解压到指定目录

mkdir  canal-server-1.1.4
tar -zxf canal.deployer-1.1.4.tar.gz -C canal-server-1.1.4/

3)、修改配置文件 查看主库 binlog position

mysql> show master status;
+---------------+----------+--------------+------------------+-------------------+
| File          | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set |
+---------------+----------+--------------+------------------+-------------------+
| binlog.000002 |     4526 |              |                  |                   |
+---------------+----------+--------------+------------------+-------------------+
1 row in set (0.00 sec)

修改配置文件 conf/example/instance.properties

# position info
canal.instance.master.address=IP:3306
# 这里对应上面的File
canal.instance.master.journal.name=binlog.000002
# 这里对应上面的Position
canal.instance.master.position=4526

#
 username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=XXXX

4)、启动 canal-server


./bin/startup.sh
# 查看日志
tail -f logs/example/example.log 

以上就完成了Canal-Server的单实例版本实现,生成环境集群环境一般是运维搭建,我们测试就用单实例版本。

关于Canal的HA机制设计下面简单介绍下,生产环境推荐使用。

canal的HA分为两部分,canal server和canal client分别有对应的HA实现


canal server:

为了减少对mysql dump的请求,不同server上的instance要求同一时间只能有一个处于running,其他的处于standby状态.

canal client:

为了保证有序性,一份instance同一时间只能由一个canal client进行get/ack/rollback操作,否则客户端接收无法保证有序。整个HA机制的控制主要是依赖了zookeeper的几个特性,watcher和EPHEMERAL节点(和session生命周期绑定),这里就不展开介绍了,有兴趣的同学可以看下官方wiki。

四、演示环节

由于canal组件封装的代码太多,我花了几个晚上业余时间写的(请点个赞吧),代码已经开源至gitee,有需要的同学可以clone下来看。

gitee地址: https://gitee.com/bytearch/fast-cloud

目前已支持simple直连模式和zookeeper集群模式

下面演示canal-client-demo

  1. 新建库order_center ,并且创建表order_info

    CREATE TABLE `order_info` (
      `order_id` bigint(20unsigned NOT NULL,
      `user_id` int(11DEFAULT \'0\' COMMENT \'用户id\',
      `status` int(11DEFAULT \'0\' COMMENT \'订单状态\',
      `booking_date` datetime DEFAULT NULL,
      `create_time` datetime DEFAULT NULL,
      `update_time` datetime DEFAULT NULL,
      PRIMARY KEY (`order_id`),
      KEY `idx_user_id` (`user_id`),
      KEY `idx_bdate` (`booking_date`),
      KEY `idx_ctime` (`create_time`),
      KEY `idx_utime` (`update_time`)
    ENGINE=InnoDB DEFAULT CHARSET=utf8;
  2. 添加处理器handler

    @CanalHandler(value = "orderInfoHandler", destination = "example", schema = "order_center", table = "order_info", eventType = CanalEntry.EventType.UPDATE, CanalEntry.EventType.INSERT,CanalEntry.EventType.DELETE)
    public class OrderHandler implements Handler<CanalEntryBO

        @Override
        public boolean beforeHandle(CanalEntryBO canalEntryBO) 
            if (canalEntryBO == null
                return false;
            
            return true;
        

        @Override
        public void handle(CanalEntryBO canalEntryBO) 
            //1. 更新后数据解析
            OrderInfoDTO orderInfoDTO = CanalAnalysisUti.analysis(OrderInfoDTO.classcanalEntryBO.getRowData().getAfterColumnsList());
            System.out.println("event:" + canalEntryBO.getEventType());
            System.out.println(orderInfoDTO);
            //2. 后续操作 TODO
        


    1. 添加配置

      canal:
        clients:
          simpleInstance:
            enable: true
            mode: simple
            servers: XXXXX:11111
            batchSize: 1000
            destination: example
            getMessageTimeOutMS: 500
          #zkInstance:
          #   enable: true
          #   mode: zookeeper
          #   servers: 172.30.1.6:2181,172.30.1.7:2181,172.30.1.8:2181
          #   batchSize: 1000
          #   #filter: order_center.order_info
          #   destination: example
          #   getMessageTimeOutMS: 500

      配置说明:

      public class CanalProperties 
          /**
           * 是否开启 默认不开启
           */

          private boolean enable = false;
          /**
           * 模式
           * zookeeper: zk集群模式
           * simple: 简单直连模式
           */

          private String mode = "simple";

          /**
           * canal-server地址 多个地址逗号隔开
           */

          private String servers;

          /**
           * canal-server 的destination
           */

          private String destination;

          private String username = "";

          private String password = "";

          private int batchSize = 5 * 1024;

          private String filter = StringUtils.EMPTY;

          /**
           * getMessage & handleMessage 的重试次数, 最后一次重试会ack, 之前的重试会rollback
           */

          private int retries = 3;

          /**
           * getMessage & handleMessage 的重试间隔ms
           * canal-client内部代码 的重试间隔ms
           */

          private int retryInterval = 3000;

          private long getMessageTimeOutMS = 1000;

  1. 测试insert和update操作

    mysql> insert into order_info(order_id,user_id,status,booking_date,create_time,update_time) values(6666666,6,10,"2022-02-19 00:00:00","2022-02-19 00:00:00""2022-02-19 00:00:00");
    Query OK, 1 row affected (0.00 sec)

    mysql> update order_info set status=20 where order_id=66666;
    Query OK, 0 rows affected (0.00 sec)
    Rows matched: 0  Changed: 0  Warnings: 0

    mysql> 
  2. 测试结果

    2022-02-18 19:29:52.399  INFO 47706 --- [ lc-work-thread] c.b.s.canal.cycle.SimpleCanalLifeCycle   : 
    ****************************************************
    * Batch Id: [11] ,count : [3] , memsize : [189] , Time : 2022-02-18 19:29:52.399
    * Start : [binlog.000003:18893:1645183792000(2022-02-18 19:29:52.000)] 
    * End : [binlog.000003:19123:1645183792000(2022-02-18 19:29:52.000)] 
    ****************************************************

    2022-02-18 19:29:52.405  INFO 47706 --- [ lc-work-thread] c.b.s.canal.cycle.SimpleCanalLifeCycle   : 
    ----------------> binlog[binlog.000003:19056] , name[order_center,order_info] , eventType : INSERT ,tableName : order_info, executeTime : 1645183792000 , delay : 400ms

    event:INSERT
    OrderInfoDTOorderId=6666666, userId=6, status=10, bookingDate=2022-02-19 00:00:00, createTime=2022-02-19 00:00:00, updateTime=2022-02-19 00:00:00

    大功告成,到这一步就顺利完成了Canal订阅解析binlog步骤。

五、数据同步注意事项

抛下两个问题大家可以思考下

  1. 数据双向同步时,如何解决数据回环问题?

    例如新系统产生的数据,同步到老系统,不能又回流到新系统,如何解决?

  2. 数据顺序问题,如果写入到MQ,是否要保证顺序消费?如何实现?

  3. 当同步并发比较大,如何提高同步速度。

温馨提示: 此专题未完,以上问题我将在下一篇文章《系统重构数据同步利器之Canal实战篇-续》实现,大家可以提前思考一下。

六、 号外

欢迎大家关注”浅谈架构“ 公众号,不定期分享原创文章

有任何问题,欢迎私信我交流。


数据实时同步利器-canal(代码片段)

...制过程1.3.2Canal的工作原理1.4使用场景2.MySQL的准备2.1创建数据库2.2创建数据表2.3修改配置文件开启Binlog2.4重启MySQL使配置生效2.5测试Binlog是否开启2.6赋权限3.Canal的下载和安装3.1下载并解压Jar包3.2修改canal.properties的配置3.3修改instan... 查看详情

elasticsearch实战(四十七)-canal实现mysql数据实时同步方案(代码片段)

    Canal主要用途是基于MySQL数据库增量日志解析,提供增量数据订阅和消费        早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务trigger获取增量变更。从2010年... 查看详情

canal同步数据库实现(代码片段)

...库同步到本地。服务器可以自己使用不用的电脑安装ubuntu系统或者买阿里云服务器。一、安装mysql这里使用docker安装mysql。这 查看详情

使用canal解决mysql和redis数据同步(tcp)(代码片段)

前言之前写过一篇文章《使用canal解决Mysql和Redis数据同步问题》,也是使用canal实现mysql和redis的数据同步,和该篇文章不一样的是,上一篇是基于MQ实现数据同步,该篇文章是基于TCP方式来实现。工作原理分析我... 查看详情

使用canal解决mysql和redis数据同步(tcp)(代码片段)

前言之前写过一篇文章《使用canal解决Mysql和Redis数据同步问题》,也是使用canal实现mysql和redis的数据同步,和该篇文章不一样的是,上一篇是基于MQ实现数据同步,该篇文章是基于TCP方式来实现。工作原理分析我... 查看详情

如何入门爬虫(基础篇)

...所有订单Python爬虫实战六之抓取爱问知识人问题并保存至数据库Python爬虫实战七之计算大学本学期绩点Python爬虫实战八之利用Selenium抓取淘宝匿名旺旺三、爬虫利器Python爬虫利器一之Requests库的用法Python爬虫利器二之BeautifulSoup的... 查看详情

mybatis系列之实战篇(中)(代码片段)

Mybatis系列之实战篇(中)接着《Mybatis系列之实战篇(上)》,我们继续。 数据表实体类Province类packagecom.emerson.etao.entity.base.address;/***省份实体类**@authorChrisMao(Zibing)**/publicclassProvince p 查看详情

mybatis系列之实战篇(中)(代码片段)

Mybatis系列之实战篇(中)接着《Mybatis系列之实战篇(上)》,我们继续。 数据表实体类Province类packagecom.emerson.etao.entity.base.address;/***省份实体类**@authorChrisMao(Zibing)**/publicclassProvince p 查看详情

爬虫+自动化利器selenium之自学成才篇(代码片段)

...窗口切换表单切换弹窗处理❤系列内容❤爬虫+自动化利器selenium之自学成才篇(一)主要内容:selenium简介、selenium安装、安装浏览器驱动、8种方式定位页面元素、浏览器控制、鼠标控制、键盘控制爬虫+自动化... 查看详情

canal+rocketmq实现mysql与elasticsearch数据同步(代码片段)

1.引言在很多业务情况下,我们都会在系统中引入ElasticSearch搜索引擎作为做全文检索的优化方案。如果数据库数据发生更新,这时候就需要在业务代码中写一段同步更新ElasticSearch的代码。这种数据同步的代码跟业务代码... 查看详情

canal-clientadapter数据同步实验(代码片段)

背景canal1.1.1版本之后,内置增加客户端数据同步功能,Client适配器整体介绍:?ClientAdapterRDB适配器RDBadapter用于适配mysql到任意关系型数据库(需支持jdbc)的数据同步及导入测试支持的数据库列表:MYSQLORACLEPOSTGRESSSQLSERVERELASTICSEARCH...clientad... 查看详情

devops利器之docker入门篇(代码片段)

...遵循Apache2.0协议,代码托管在Github:Docker源码地址各大操作系统现都支持Docker,并且最新的Linux发行版RedHat、CentOS、Ubuntu中均已默认带有Docker软件包.Docker的构想是要实现“Build,ShipandRunAnyApp,Anywhere”,即通过对应用的封装(Packaging)、分... 查看详情

爬虫+自动化利器selenium之自学成才篇(代码片段)

...前页面对当前页面进行截图❤系列内容❤爬虫+自动化利器selenium之自学成才篇(一)主要内容:selenium简介、selenium安装、安装浏览器驱动 查看详情

elasticsearch实战(四十七)-canal实现mysql数据实时同步方案

    Canal主要用途是基于MySQL数据库增量日志解析,提供增量数据订阅和消费        早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务trigg... 查看详情

mysql实战篇之普通索引和唯一索引--01(代码片段)

...践changebuffer和redolog小结补充引言假设你在维护一个市民系统,每个人都有一个唯一的身份证号,而且业务代码已经保证了不会写入两个重复的身份证号。如果市民系统需要按照身份证号查姓名,就会执行类似这样的SQ... 查看详情

canal同步数据(代码片段)

canal同步数据canal可以用来监控数据库数据的变化,从而获得新增数据,或者修改的数据。canal是应阿里巴巴存在杭州和美国的双机房部署,存在跨机房同步的业务需求而提出的。阿里系公司开始逐步的尝试基于数据库... 查看详情

消息队列之利器锋芒(代码片段)

  随着企业的发展,所用的系统越来越复杂。系统势必会发展成分布式系统。消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋,可靠投递,广播,最终一致性等问题。实现高性能,高可用... 查看详情

elasticsearch搭配canal构建主从复制架构实战(代码片段)

...-head插件安全保护前言elasticsearch通常在项目中用于做海量数据存储和全文搜索,在电商的商品搜索,论坛的发帖搜索中有广泛应用。但是一般这些数据会存储在关系型数据库中例如mysql,mysql数据库拥有良好的事务解... 查看详情