关键词:
为了系统间解耦,我们通常会引入MQ
框架,大家各司其职共同完成上下游的业务流程。
大致过程:
生产端,创建一条消息,通过网络发送到MQ Server
MQ将 消息存储在topic 的一个分区里
消费端,从分区中拉取消息,消费处理
但现实往往不一样!MQ 架构设计要满足高并发、高性能、高可用等指标
单分区,达不到我们的吞吐量要求,我们考虑采用多分区
架构设计,正所谓 ”三个臭皮匠赛过一个诸葛亮“,多分区可以有效分摊全局压力,提升整体系统性能。
两台 MQ机器,组成一个集群,原先一个分区存储6条消息
,现在分摊到两个分区,每个分区各存储3条消息
,性能比上面那个提升一倍。
貌似可以满足我们的需求,但任何事情都有两面性!
我们看看下面业务场景:
一个用户在电商网站上下订单到交易完成,中间会经历一系列动作,订单的状态也会随之变化,一个订单会产生多条MQ消息,下单
、付款
、发货
、买家确认收货
,消费端需要严格按照业务状态机的顺序处理,否则,就会出现业务问题。
我们发现,消息带上了状态,不再是一个个独立的个体,有了上下文依赖关系!
对于这个问题,突然想到HTTP协议
,其本身也是无状态的,也就是说前后两次请求没有关联,但有些业务功能有登录要求,那怎么解决?
引入Cookie机制,每次请求客户端额外传输一些数据,来达到上下文关联。
回到MQ的消息顺序问题,我们要如何解决?
答案:各退一步,保证局部有序。
比如上面的电商例子,只要保证一个订单的多条状态消息在同一个分区,便可以满足业务需求,这个方案可以覆盖大部分的业务场景。
这里面只需要有一个路由策略
组件,由它决定消息该放到哪个分区中!
考虑到市面MQ开源框架很多,常见的如:Kafka、Pulsar、RabbitMQ、RocketMQ 等,API方法略有区别,但设计思路是相通的。
接下来,我们以 RocketMQ
为例:
生产端提供了一个接口 MessageQueueSelector
public interface MessageQueueSelector
MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
接口内定义一个select方法,具体参数含义:
mqs:该Topic下所有的队列分片
msg:待发送的消息
arg:发送消息时传递的参数
关于MessageQueueSelector
接口,RocketMQ 框架提供了三个默认实现类:
1、SelectMessageQueueByHash:
arg参数的hashcode的绝对值,然后对mqs.size()取余,得到目标队列在mqs的下标
2、SelectMessageQueueByRandom:
对mqs.size()值取随机数作为目标队列在mqs的下标
3、SelectMessageQueueByMachineRoom
返回null
特别注意:
虽然保证了单个分片的消息有序,但每个分片的消费者只能是单线程处理,因为多线程无法控制消费顺序。这个可能会损失一些性能。
这里又引出另一个问题,如何保证一个队列只能有一个消费端呢?
1、
org.apache.rocketmq.client.impl.consumer.RebalanceImpl#updateProcessQueueTableInRebalance
遍历一个topic下所有的
MessageQueue
isOrder && !this.lock(mq)
尝试对它加锁,确保一个MessageQueue
只能被一个消费者处理
2、将PullRequest
对象放入PullMessageService
的pullRequestQueue
队列中
public void dispatchPullRequest(List<PullRequest> pullRequestList)
for (PullRequest pullRequest : pullRequestList)
this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
log.info("doRebalance, , add a new pull request ", consumerGroup, pullRequest);
3、org.apache.rocketmq.client.impl.consumer.PullMessageService#run
PullMessageService
是一个Runnable
线程任务无限循环,从队列中拉取、处理消息
另一个问题,如何保证一个队列,只有一个线程在处理消息呢?
1、 DefaultMQPushConsumerImpl#pullMessage
ConsumeMessageService
中有两个实现类,因为我们有消费顺序要求,会选择ConsumeMessageOrderlyService
来处理业务
2、 ConsumeMessageOrderlyService.ConsumeRequest
从
ConcurrentMap
中获取messageQueue
对应的锁对象通过
synchronized
关键字,线程来抢占锁,互斥关系,从而保证了一个MessageQueue
只能有一个线程并发处理
继续往下看,如果扩容了怎么办?
原来有6个分区,order_id_1
的消息在MessageQueue6
中,此时扩容一倍,现在12个分区,order_id_1
订单后面产生的消息可能路由到了MessageQueue8
中,同一个订单的消息分布在两个分区中,无法保证顺序。
我们能做的是,先将存量消息处理完,再扩容。如果是在线业务,可以搞个临时topic,先将消息暂时堆积,待扩容后,按新的路由规则重新发送。
顺序消息,如果某条失败了怎么办?会不会一直阻塞?
1、如果失败,不会提交消费位移,系统会自动重试(有重试上限),此时会阻塞后面的消息消费,直到这条消息处理完
2、如果这个消息达到重试上限,依然失败,会进入死信队列
,可以继续处理后面的消息
有道无术,术可成;有术无道,止于术
欢迎大家关注Java之道公众号
好文章,我在看❤️
mq消息队列(代码片段)
...点1.3消息队列的缺点1.4消息队列比对2、RabbitMQ2.1、RabbitMQ如何保证消息不被重复消费2.2、RabbitMQ如何保证消息不丢失2.2.1生产者丢数据2.2.2消息队列丢数据2.2.3消费者丢数据2.3、RabbitMQ如何保证消息有序2.4、RabbitMQ如何处理消息堆积... 查看详情
互联网面试必杀:如何保证消息中间件全链路数据100%不丢失(代码片段)
...路到底如何保证数据不能丢失。这个问题,在互联网公司面试的时候高频出现,而且也是非常现实的生产环境问题。如果你的简历中写了自己熟悉MQ技术(RabbitMQ、RocketMQ、Kafka),而且在项目里有使用的经验,那么非常实际的一... 查看详情
面试rabbitmq面试题(代码片段)
文章目录什么是MQ什么是列队为什么使用MQMQ的优点消息队列有什么优缺点?RabbitMQ有什么优缺点?Kafka、ActiveMQ、RabbitMQ、RocketMQ有什么优缺点?MQ有哪些常见问题?如何解决这些问题?什么是RabbitMQ?rabbitmq... 查看详情
互联网面试必杀:如何保证消息中间件全链路数据100%不丢失:第二篇(代码片段)
前情提示上一篇文章《互联网面试必杀:如何保证消息中间件全链路数据100%不丢失:第一篇》,我们初步介绍了之前制定的那些消息中间件数据不丢失的技术方案遗留的问题。一个最大的问题,就是生产者投递出去的消息,可能... 查看详情
互联网面试必杀:如何保证消息中间件全链路数据100%不丢失:第四篇(代码片段)
前情提示上篇文章:《互联网面试必杀:如何保证消息中间件全链路数据100%不丢失:第三篇》,我们分析了RabbitMQ开启手动ack机制保证消费端数据不丢失的时候,prefetch机制对消费者的吞吐量以及内存消耗的影响。通过分析,我... 查看详情
消息队列中,如何保证消息的顺序性?(代码片段)
问:如何保证消息的顺序性?面试官心理分析其实这个也是用MQ的时候必问的话题,第一看看你了不了解顺序这个事儿?第二看看你有没有办法保证消息是有顺序的?这是生产系统中常见的问题。面试题剖析我举个例子,我们以... 查看详情
rabbitmq面试问题(代码片段)
1、MQ如何避免消息堆积,使MQ提高消费者的速度? 提高消费者速率(集群)、消费者批量获取消息。2、MQ如何避免消费者重复消费(幂等问题) 基于Redis的setNx命令。 基于数据库表字段... 查看详情
面试官再问我如何保证rocketmq不丢失消息,这回我笑了!(代码片段)
最近看了@JavaGuide发布的一篇『面试官问我如何保证Kafka不丢失消息?我哭了!』,这篇文章承接这个主题,来聊聊如何保证RocketMQ不丢失消息。0x00.消息的发送流程一条消息从生产到被消费,将会经历三个阶段:生产阶段,Producer... 查看详情
关于mq的几件小事如何保证消息不丢失(代码片段)
1.mq原则数据不能多,也不能少,不能多是说消息不能重复消费,这个我们上一节已解决;不能少,就是说不能丢失数据。如果mq传递的是非常核心的消息,支撑核心的业务,那么这种场景是一定不能丢失数据的。2.丢失数据场景... 查看详情
浅谈rocketmq如何保证消息不丢失(代码片段)
RocketMQ如何保证消息不丢失?一、明确丢失消息场景二、RocketMQ避免消息丢失解决方案1、消息生产者使用事务消息2、配置同步刷盘+Dledger主从架构3、消息消费者端使用同步消费机制4、设计降级方案MQ如何避免消息不丢失... 查看详情
浅谈rocketmq如何保证消息不丢失(代码片段)
RocketMQ如何保证消息不丢失?一、明确丢失消息场景二、RocketMQ避免消息丢失解决方案1、消息生产者使用事务消息2、配置同步刷盘+Dledger主从架构3、消息消费者端使用同步消费机制4、设计降级方案MQ如何避免消息不丢失... 查看详情
mq——如何保证消息不会丢失(代码片段)
一条消息从生产到消费完成这个过程,可以划分三个阶段,为了方便描述,我给每个阶段分别起了个名字。生产阶段:在这个阶段,从消息在Producer创建出来,经过网络传输发送到Broker端。存储阶段:在这个阶段,消息在Broker端存... 查看详情
消息中间件的6大高频问题和答案(代码片段)
《Java面试必知必会系列》继续更新ing,今天给大家分享一些消息中间件相关的面试问题,希望你在面试中遇到后,可以回答的上来。当然关于消息中间件的内容其实是非常非常多的,这里只是列举出来一些高频的... 查看详情
rabbitmq面试题总结,非常详细,杜绝标题党,不详细你打我,下次不写博客了(代码片段)
...总结,虽然我也是个小白,最近也在找工作,但是我们上网看面试题的时候,发现这些所谓的大博主,什么狗屁新星,没一点原创,抄的可怕,标明原创也就算了,还一字不漏的给抄下来,有些一字不漏的复制我理解,因为大伙说法都那样,但是... 查看详情
美女面试官问我:同步屏障和异步消息的运行机制(代码片段)
...小安】关注还在移动开发领域苟活的大龄程序员,“面试系列”文章将在公众号同步发布。1.前言通过昨天的技术交流,天才少年成功闯过一关,来到二面现场。2.正文哎呀,怎么面试官跟昨天的是同一个人,... 查看详情
美女面试官问我:同步屏障和异步消息的运行机制(代码片段)
...小安】关注还在移动开发领域苟活的大龄程序员,“面试系列”文章将在公众号同步发布。1.前言通过昨天的技术交流,天才少年成功闯过一关,来到二面现场。2.正文哎呀,怎么面试官跟昨天的是同一个人,... 查看详情
消息队列的面试题3
1、面试题 如何保证消息不被重复消费啊(如何保证消息消费时的幂等性)? 2、面试官心里分析 其实这个很常见的一个问题,这俩问题基本可以连起来问。既然是消费消息,那肯定要考虑考虑会不会重复消费?能不... 查看详情
rabbitmq如何保证消息可靠性(代码片段)
个人博客请访问 http://www.x0100.top 如何保证消息的可靠性?消息队列如何进行限流?如何设置延时队列进行延时消费?1.📖如何保证消息的可靠性?先来看看我们的万年老图,从图上... 查看详情