如何保证消息队列里的数据顺序执行?

HollisChuang HollisChuang     2023-01-12     533

关键词:

使用MQ的时候,经常会有按顺序消费的需求,比如大数据团队为了做数据分析,会把数据库里数据同步到其他系统做一些数据统计分析。同步MySQL的时候,为了保证数据同步的实时性,会在中间加一个MQ,多个线程来消费MQ里的数据。

这种同步一般是读取binlog数据,你在MySQL里增改删了数据,对应出来就是3条增改删binlog日志发送到MQ里面,消费的时候肯定必须要按照增改删的顺序执行。如果你换成删除、修改、增加,就导致数据乱套了。

图1 binlog同步

我们以kafka举例,看下哪些环节会出现数据顺序不一致情况,又怎么解决。

假设kafka分配了3个partition,kafka的一个特性就是,能保证写入一个partition中的数据一定是有顺序的。

生产者写的时候,可以指定一个key,比如是订单id作为key,这个id对应的数据一定会写到同一个partition中去,而且这个partition中的数据都是有顺序的。

图2 kafka partition

kafka的消费者开始消费partition中的数据,一个消费消费一个partition,一个partition只能被一个消费者消费,不会出现一个消费者同时消费多个partition的情况。假如现在有3个partition,你启动4个消费者,那么就会有一个消费者消费不到数据。

图3 一个消费者消费一个partition

到目前为止,每个消费者消费到的数据都是有顺序性的。但消费者内部如果是单线程的,效率就会比较低,如果生产者写入kafka的数据量比较大,消费不及时,就会出现消息堆积的情况,所以消费者需要多线程的方式运行。

假如消费者里启动了3个线程,并发的来消费数据,线程之间如果不做同步控制,还是会导致数据乱掉。

图4 消费者多线程消费MQ

那如何保证kafka消费者多线程按顺序消费数据呢?

多个线程不能直接拿数据去处理,此时我们可以在同步系统中搞多个内存队列,消费者拿到数据之后,根据每条数据的key做hash取模,把相同id的数据分配到同一个内存队列中去。

每个内存队列里的数据都是有顺序性的,给每个内存队列都对应一个线程,去消费内存队列中的数据。

假如有3条增改删的数据,都是对同一个id的处理,那么hash取模后就会写入到同一个内存队列里去,由同一个线程去消费,然后按顺序写入数据库中。

如果消费者按照单线程消费处理,一条数据耗费几十毫秒,1秒钟只能处理十几条数据,吞吐量就会非常低。如果开启多线程的方式处理,就会几倍的提高吞吐量,同时也保证了数据的顺序性。

整个流程按这样的设计方案来处理,就可以保证数据的顺序性。

有道无术,术可成;有术无道,止于术

欢迎大家关注Java之道公众号

好文章,我在看❤️

rocketmq-如何实现顺序消息

...,BinLog消息得到顺序也必须保证是新增消息、修改消息。如何发送和消费顺序消息我们使用RocketMQ顺序消息来模拟一下订单的场景,顺序消息分为两部分:顺序发送、顺序消费。1.顺序发消息上面代码模拟了按顺序依次发送创建... 查看详情

消息队列中,如何保证消息的顺序性?(代码片段)

问:如何保证消息的顺序性?面试官心理分析其实这个也是用MQ的时候必问的话题,第一看看你了不了解顺序这个事儿?第二看看你有没有办法保证消息是有顺序的?这是生产系统中常见的问题。面试题剖析我举个例子,我们以... 查看详情

rabbitmq面试题:如何解决消息的顺序问题?---2022-04-03

参考技术ARabbitMQ的消息顺序问题,需要分三个环节看待,发送消息的顺序、队列中消息的顺序、消费消息的顺序。消息发送端的顺序,大部分业务不做要求,谁先发消息无所谓,如果遇到业务一定要发送消息也确保顺序,那意味... 查看详情

rocketmq如何保证消费顺序

参考技术A集群模式下锁队列保证消息被同一个consumer消费,往broker定时发送锁命令本地消费时不论集群模式和广播模式都会有本地队列锁进行锁定保证同一个队列只会同时被一个消费者线程锁定本地消费时也按队列进行锁定操作... 查看详情

分布式场景下如何保证消息队列实现最终一致性(代码片段)

考虑一个分布式场景中一个常见的场景:服务A执行某个数据库操作成功后,会发送一条消息到消息队列,现在希望只有数据库操作执行成功才发送这条消息。下面是一些常见的作法:1.先执行数据库操作,再发送消息publicvoidpurch... 查看详情

如何保证rabbitmq消息顺序消费(代码片段)

问题分析所谓顺序消费,即保证消息的发送顺序要与消息的消费顺序保持一致。比如数据库对一条数据依次进行了 插入->更新->删除操作,这个顺序必须是这样,如果在同步过程中,消息的顺序变成了删除->... 查看详情

架构设计消息篇之保证消息顺序性

参考技术A上一篇文章我们讨论了消息中间件如何保证消息不丢失。这篇文章我们讨论一下使用消息中间件Kafka时,如何保证消息的顺序性。一般情况我们不太需要保证消息的顺序性,但在某些对顺序要求极其严格的场景下,需要... 查看详情

azuremessaging-servicebusmessaging消息队列技术系列3-消息顺序保证

...中我们以实际的使用场景来说明AzureMessaging是否支持以及如何编码实现:消息的收发顺序保证。消息的收发在实际业务中 查看详情

rocketmq使用之消息保证,重复读,积压,顺序,过滤,延时,事务,死信

...是可靠性优先的场景都应该使用同步从Consumer角度分析,如何保证消息被成功消费?Consumer保证消息成功消费的关键在于确认的时机,不要在收到消息后就立即发送消费确认,而是应该在执行完所有消费业务逻辑之后,再发送消... 查看详情

消息队列面试题要点

...什么使用消息队列?使用消息队列有什么缺点?消息队列如何选型?如何保证消息队列是高可用的?如何保证消息不被重复消费?如何保证消费的可靠性传输?如何保证消息的顺序性?下面围绕以上七点进行阐述。需要说明一下... 查看详情

rocketmq使用顺序消息(代码片段)

...个队列内的数据是顺序存储的,但队列间的数据无法保证顺序性。RocketMQ目前支持保证某类数据或部分数据的顺序性。核心思想是:发送消息时,可以通过实现MessageQueueSelector接口,选择消息发送到哪 查看详情

kafka如何保证消息顺序消费

参考技术A上一篇<<<Kafka如何实现分区及指定分区消费下一篇>>>Kafka如何保证高吞吐量推荐阅读:<<<消息中间件的核心思想<<<消息中间件常见问题汇总<<<基于Netty简单手写消息中间件思路<<<... 查看详情

如何保证消息的顺序性

...个partition,一个consumer,内部多线程,这不也明显乱了那如何保证消息的顺序性呢?简单简单:(1)rabbitmq:拆分多个queue,每个queue一个consumer,就是多一些queue而已,确实是麻烦点;或者就一个queue但是对应一个consumer,然后这... 查看详情

rocketmq使用顺序消息(代码片段)

...个队列内的数据是顺序存储的,但队列间的数据无法保证顺序性。RocketMQ目前支持保证某类数据或部分数据的顺序性。核心思想是:发送消息时,可以通过实现MessageQueueSelector接口,选择消息发送到哪个队列,... 查看详情

消息队列的面试题5

 1、面试题 如何保证消息的顺序性? 2、面试官心里分析 其实这个也是用MQ的时候必问的话题,第一看看你了解不了解顺序这个事儿?第二看看你有没有办法保证消息是有顺序的?这个生产系统中常见的问题。 ... 查看详情

16如何保证消息的顺序性

先看顺序会错乱的场景:RabbitMQ,一个queue,多个consumer,这不明显乱了;解决:1、拆分多个queue,每个queue一个consumer,就是多一些queue而已,确实有点麻烦,2、或者就一个queue就是对应一个consumer,然后这个consumer内部调用内存... 查看详情

浅谈消息队列之rocketmq

...的情况 使用消息队列需要注意什么?系统复杂性增加如何保证消息队列是高可用,即做到集群高可用如何保证消费的可靠性传输,即不丢消息如何保证消息不被重复消费,即保证消费的幂等性如何保证消息的顺序性,即保证... 查看详情

分布式之消息队列复习精讲(代码片段)

...什么使用消息队列?使用消息队列有什么缺点?消息队列如何选型?如何保证消息队列是高可用的?如何保证消息不被重复消费?如何保证消费的可靠性传输?如何保证消息的顺序性?我们围绕以上七点进行阐述。需要说明一下,本... 查看详情