kafka新老集群平滑迁移实践(代码片段)

NetWhite NetWhite     2023-01-27     320

关键词:

前言

公司一直使用云上的kafka服务,随着业务规模和体量的增大,使用云上的服务成本相对比较高,所以考虑本地自建kafka集群对外提供服务。

因此,需要把正在运行的还在使用云上kafka的业务服务迁移到本地自建集群上。

要求

  • 代码改动小
  • 升级过程中的稳定性
  • 升级后消息发送与消费的正确性

迁移方案

双写/双读

顾名思义,生产端:消息同时发送新、老集群,消费端:同时消费两个集群的消息。

等到确认新集群的稳定和消息正确性后,逐渐下掉对老集群的依赖。

发送端双写还好做,难点在于消费端消费迁移实现上,主要可能有以下几种问题:

  • 如果采用先消费到备库上,后续备库再切换为主库,很多业务在其目前场景下其实很难实现。
  • 消费端对新集群消息的消费逻辑只是空转意义不大,如果期望检测消费的新、老集群的消息一致性,开发成本也是比较高。
  • 消费端不采用双读方案,不消费新集群的消息。最后直接切换到新集群开始消费,这样需要保证消费的幂等性。但是很多场景下是无法保证的,比如使用了第三方大数据相关的组件。
  • 采用双写/双读的方案,很多项目相关负责的同学,肯定也是无法接受的,毕竟代码改造太多了,开发成本太高。

所以,优先不考虑这种方案,采用了下面这种数据同步的方案。

数据同步

采用消息同步工具,将老集群的消息直接同步到新集群,客户端不再需要双读/双写,最后切换的时候直接修改为新集群的地址,重新发布即可。

整个过程如下:

 最后下掉老集群。

这样无论对发送端或消费端都是是极其友好,且“平滑”。

我最终考虑并采用的是这个方案,唯一的问题便是迁移过程中的所有问题和压力都从业务侧的同学转移到我们这边,比如:发送的消息如何同步,如何避免消费端切换后,重复消费或者漏掉消息未消费

迁移过程

迁移基本流程正如前面流程图展示的:数据同步->迁移生产端/消费端。

生产端和消费端没有先后切换新集群上的顺序要求,但是如果先把生产端切换到新集群,消费端就无法从老集群继续消费消息了,需要在消息的过期时间内,赶紧也切换到新集群。

但是如果消费端先切,则发送端可以在之后的任何时间。

消息同步

消息同步是第一步,kafka的消息同步工具在业内有做的比较好的商业版提供,同时也有开源版本供使用。

我选取的是官方自带的kafka-mirror-maker工具。

但是不能直接拿来用,否则同步过来的消息无法继续我下面的方案。

kafka-mirror-maker的默认实现就是消费老集群指定topic的消息并重新发送到新集群,且发送的时候未指定分区。但是我需要保证新老集群上每条消息在每个分区上的顺序保持一致,不能出现消息在老集群的分区0上,同步后被发送到新集群的分区1上。

因此进行适当改造,如下,构造消息的时候指定发送的分区:

  private[tools] object defaultMirrorMakerMessageHandler extends MirrorMakerMessageHandler 
    override def handle(record: BaseConsumerRecord): util.List[ProducerRecord[Array[Byte], Array[Byte]]] = 
      val timestamp: java.lang.Long = if (record.timestamp == RecordBatch.NO_TIMESTAMP) null else record.timestamp
      Collections.singletonList(new ProducerRecord(record.topic, record.partition, timestamp, record.key, record.value, record.headers))
    
  

重新编译打包。

其实执行同步前,确保要同步的topic,已经在新集群创建并且新、老集群的分区数保持一致。

最后,同步的时候,我们并不需要一次性把所有的topic消息都向新集群同步。有针对性的处理,要迁移哪个服务,同步对应该topic的消息,迁移完成,停掉对应的同步进程,然后继续下个服务。

当然为了方便,我同时开发了对应的启停脚本,尽可能方便、规范的进行。

下面是我计划的协作流程:

因为我们自建集群开启了ACL,老集群并没有,所以中间涉及到ACL相关的处理。

消费位点同步

消费端切换集群这一步问题也是最多最复杂的。

对于生产者来说,因为作了消息同步,新老集群消息及消息在分区的位置都是一致的,所以直接修改地址切换新集群继续发送即可。

对于消费端来说,如何在切换到新集群后,还能继续相对于老集群中原来的消费位点继续消费是一个问题,需要保证消息不会重复消费,也不会丢失消费。所以计算在新集群中的消费位点是很重要的。

正常情况下,kafka是没有创建消费组的功能,我们现在要做的就是,如果消费端切换新集群后,就已经知道现在要做的这个消费组要从这个位点继续消费了。

对于这个问题,我开发了一个功能,可以新增订阅:

 输入一个消费组,并选择集群已经存在topic,创建订阅关系,示例如下,我创建一个订阅test_topic的消费组:

如果现在消费组为test_consumer的消费端,现在要从老集群切换到新集群继续消费,我们只要保证它不会出现重复消费,也不会漏掉消息即可。

所以这里解决方案的关键点就是需要将每个分区的消费位点,进行重置,重置的位置就是想当于在老集群中消费到的消息位点。

这个位点计算还是比较麻烦的,因为新老集群中各分区位点是一定不一致的。比如对于test_topic,可能此时的最小位点是1001(1001以前的消息过期删除了),日志最大位点是1300,所以现在实际某个分区有300条消息。但是同步到新集群,位点是从0开始,该分区在新集群最大位点就是300。

如果同步进程运行了好久,消费端才迁移新集群,新老集群的消息留存时间又不太一致,都删过几次过期消息,则老集群可能日志的最小位点是3001,最大位点是3300,新集群的最小位点是1000,最大位点是2300,两个集群现在留存的消息数都不一致了。

所以我这里采用了两阶段的同步方案:

  1. 准备同步消息的时候,进行位点标记
  2. 准备迁移消费端的时候,进行消费位点同步

大概就是上面说明的,通过相关的标记和计算确保新老集群消费的相对位点在各处场景下都是一致的,同步功能做了必要的效验,会保证同步的时候环境是预期的否则同步不了。如果最后同步阶段失败,则清除新集群中该topic相关数据,重新同步,重新执行这个流程即可(兜底解决方案)。

数据同步问题

数据同步过程中也是可能出现问题的,有些难以预料且致命的,如果消费端还没迁移,我们可以清除数据,重新同步,影响不大。

如果在同步过程中,同步进程挂掉,重启可能导致新老集群数据不一致。因为同步也是一边消费老集群一边发送到新集群,所以无法保证在挂掉重启的时候,是否会重复同步那一批次消息。

这个时候也是有解决方案的:

上面说过,在同步前进行了相关位移对齐,这时候可以查看相关对齐信息,人工重置用来同步的消费组的消费位点来保证消息一致。

最后,如何确认新老集群的消息是一致。这个时候假定同步过程,消息体没有出问题(这个出问题确实不好校验,认命了),只要查看对齐记录里的位移信息进行计算,确认从同步一来,新老集群每个分区的消息数都是一样,就可以确认消息一致的(不保证消息体的内容也没问题,比如丢包导致的)。

markdownhdfs不同集群迁移实践(代码片段)

查看详情

markdownhbase的(凤凰)集群迁移实践(代码片段)

查看详情

kafka数据迁移(代码片段)

...应用场景中,数据迁移是一个比较常见的问题。关于Kafka集群数据如何迁移,今天笔者将为大家详细介绍。2.内容本篇博客为大家介绍两种迁移场景,分别是同集群数据迁移、跨集群数据迁移。如下图所示:2.1同集群迁移同集群... 查看详情

kafka集群监控安全机制与最佳实践(代码片段)

Kafka监控安装Kafka集群监控方案选择:Kafka只能依靠kafka-run-class.sh等命令进行管理KafkaManager(CMAK)是目前比较常用的监控工具,它有如下功能:管理多个集群轻松检查群集状态(主题,消费者,偏移,代理,副本分发,分区分发... 查看详情

消息中间件学习总结(24)——rocketmq集群平滑迁移数据中心记录(代码片段)

...的数据中心迁移方式。架构简述:1、我们迁移rocketmq集群之前两个数据中心直接通过20GB的裸光互连。2、我们在新的当前数据中心的rocketmq架构是ÿ 查看详情

消息中间件学习总结(24)——rocketmq集群平滑迁移数据中心记录(代码片段)

...的数据中心迁移方式。架构简述:1、我们迁移rocketmq集群之前两个数据中心直接通过20GB的裸光互连。2、我们在新的当前数据中心的rocketmq架构是ÿ 查看详情

clickhouse集群迁移实践(代码片段)

clickhouse集群迁移实践原文链接背景现有的ck集群没有副本,磁盘是12块盘的stat盘,存在磁盘故障导致数据丢失的风险,而依赖zk的双副本策略又由于zk性能存在瓶颈影响集群的可用性,故而使用带三副本的高效云盘... 查看详情

kafka集群扩容后的topic分区迁移(代码片段)

kafka集群扩容后的topic分区迁移 ./bin/kafka-topics.sh--zookeepernode3:2181,node4:2181,node5:2181 --alter--topicdftt--partitions4kafka集群扩容后,新的broker上面不会数据进入这些节点,也就是说,这些节点是空闲的;它只有在创建新的topic时才... 查看详情

消息中间件学习总结(24)——rocketmq集群平滑迁移数据中心记录(代码片段)

...的数据中心迁移方式。架构简述:1、我们迁移rocketmq集群之前两个数据中心直接通过20GB的裸光互连。2、我们在新的当前数据中心的rocketmq架构是,两个nameserver和三组主从架构的broker节点,迁移到新的数据中心是两个n... 查看详情

hp下kafka的实践(代码片段)

...(Feed)分类,每一类的消息称之为一个主题(Topic)。broker:以集群的方式运行,可以由一个或多个服务组成,每个服务叫做一个broker;消费者可以订阅一个或多个主题(topic),并从Broker 查看详情

elasticsearch7.5.0+kibana-7.5.0+cerebro-0.8.5集群生产环境安装配置及通过elasticsearch-migration工具做新老集群数据迁移

一、服务器准备目前有两台128G内存服务器,故准备每台启动两个es实例,再加一台虚机,共五个节点,保证down一台服务器两个节点数据不受影响。二、系统初始化参见我上一篇kafka系统初始化:https://www.cnblogs.com/mkxfs/p/12030331.html... 查看详情

kafkaeagle监控mrskafka之操作实践(代码片段)

...软件,提供丰富的kafka监控指标,例如:Kafka集群的Broker数、Topic数、Consumer数、TopicLogSizeTop10、TopicCapacityTop10、Lag挤压,CPU/Memory监控等。Eagle新版本中改名为EFAK,基于版本3.0.1对接监控MRS3.1.0(普通集群࿰... 查看详情

hdfspb级数据无感迁移实践(代码片段)

...前言前面文章笔者介绍了比较多关于HDFSRBF如何来解决多集群协调管理的问题,RBF里的Router服务在里面扮演着一个很重要的角色。它能让众多的NN对于client来说完全透明,鉴于这一点,我们可以很好地依托Router服务来做... 查看详情

docker搭建kafka集群(代码片段)

Docker搭建kafka集群集群规划镜像版本kafka为什么需要依赖zookeeper创建docker网络搭建zk集群新建文件docker-compose-zk.yml启动搭建kafka集群新建三个挂载文件挂载原因挂载步骤新建docker-compose-kafka.yml启动集群安装kafka-manager新建docker-compose-... 查看详情

kafka集群搭建(代码片段)

目录1下载或复制Kafka安装包2解压缩Kafka安装包3配置Kafka集群4Kafka的初步应用4.1创建主题4.2发送消息4.3消费消息1下载或复制Kafka安装包http://archive.apache.org/dist/kafka/0.10.1.0/2解压缩Kafka安装包tar-zxvfkafka_2.11-0.10.1.0.gz3配置Kafka集群配置Ka... 查看详情

kafka集群搭建(代码片段)

1.启动Serverhttp://kafka.apache.org/quickstartForKafka,asinglebrokerisjustaclusterofsizeone,sonothingmuchchangesotherthanstartingafewmorebrokerinstances.#下载kafka[[email protected]kafka]#tar-xzfkafk 查看详情

kafka-集群部署(代码片段)

第2章Kafka集群部署 2.1环境准备2.1.1集群规划hadoop102hadoop103hadoop104zkzkzkkafkakafkakafka2.1.2jar包下载http://kafka.apache.org/downloads.html 2.2Kafka集群部署  1)解压安装包[[email protected]software]$tar-zxvfk 查看详情

kafka集群部署(代码片段)

前提:kafka集群依赖于zk集群,没有zk集群环境的请先参考 http://www.cnblogs.com/yjmyzz/p/4587663.html .假设搭建3个节点的kafka集群,下面是步骤: 一、下载http://kafka.apache.org/downloads,如果只是安装,直接downkafka_2.12-0.11.0.0.tgz即... 查看详情