rocketmq实现延时队列

author author     2023-04-09     767

关键词:

参考技术A 说明:rocketmq实现的延时队列只支持特定的延时时间段,1s,5s,10s,...2h,不能支持任意时间段的延时

具体实现:rocketmq发送延时消息时先把消息按照延迟时间段发送到指定的队列中(rocketmq把每种延迟时间段的消息都存放到同一个队列中)然后通过一个定时器进行轮训这些队列,查看消息是否到期,如果到期就把这个消息发送到指定topic的队列中,这样的好处是同一队列中的消息延时时间是一致的,还有一个好处是这个队列中的消息时按照消息到期时间进行递增排序的,说的简单直白就是队列中消息越靠前的到期时间越早

流程图

源码分析:

如果想要深入了解的可以看一下ScheduleMessageService这个类

delayLevelTable定义了延迟级别和延迟时间的对应关系,offsetTable存放延延迟级别对应的队列消费的offset

使用timer定时器启动了一个定时任务,把每个扫描队列封装成一个任务,然后加入到timer中

每个扫描任务主要是把队列中所有到期的消息都拿出来,并发送到指定的topic下,并把延迟队列中的消息删除

总结

优点:设计简单,把所有相同延迟时间的消息都先放到一个队列中,定时扫描,可以保证消息消费的有序性

缺点:定时器采用了timer,timer是单线程运行,如果延迟消息数量很大的情况下,可能单线程处理不过来,造成消息到期后也没有发送出去的情况

改进点:可以在每个延迟队列上各采用一个timer,或者使用timer进行扫描,加一个线程池对消息进行处理,这样可以提供效率

rocketmq延时消息实现原理探究(代码片段)

由于日常开发中遇到几次使用延时消息的场景,而且目前业务中使用到的消息中间件有rabbitmq和kafka,对延时消息的支持都不太理想。其中rabbitmq延时消息是通过设置队列ttl+死信exchange实现缺点嘛:每次都得设置两... 查看详情

springboot实战项目整合阿里云rocketmq消息队列实现发送普通消息,延时消息(代码片段)

原文地址:Springboot实战项目整合阿里云RocketMQ消息队列实现发送普通消息,延时消息--附代码-学不会丶-博客园一.为什么选择RocketMQ消息队列?(可跳过看三的整合代码实例)首先RocketMQ是阿里巴巴自研出来的&#... 查看详情

mq任意延时消息实现原理概述

目录前置知识实现思路前置知识以RocketMQ为例使用IDR启动RocketMQ,参考Eclpse启动RocketMq大多数消息中间件都支持固定延时队列,比如RocketMQ支持的默认延时等级messageDelayLevel=1s5s10s30s1m2m3m4m5m6m7m8m9m10m20m30m1h2h。参考RocketMQ延时配置Netty... 查看详情

rocketmq实现延迟队列精确到秒级实现

前言篇:为了节约成本,决定通过自研来改造rocketmq,添加任意时间延迟的延时队列,开源版本的rocketmq只有支持18个等级的延迟时间,其实对于大部分的功能是够用了的,但是以前的项目,全部都是使... 查看详情

springboot实战项目整合阿里云rocketmq消息队列实现发送普通消息,延时消息(代码片段)

原文地址:Springboot实战项目整合阿里云RocketMQ消息队列实现发送普通消息,延时消息--附代码-学不会丶-博客园一.为什么选择RocketMQ消息队列?(可跳过看三的整合代码实例)首先RocketMQ是阿里巴巴自研出来的&#... 查看详情

如果有人再问你怎么实现分布式延时消息,这篇文章丢给他(代码片段)

1.背景上篇文章介绍了RocketMQ整体架构和原理有兴趣的可以阅读一下,在这篇文章中的延时消息部分,我写道开源版的RocketMQ只提供了18个层级的消息队列延时,这个功能在开源版中显得特别鸡肋,但是在阿里云中的RocketMQ却提供... 查看详情

rocketmq(10)——发送延时消息(代码片段)

...达了Broker就会写入消息队列,消费者就可以进行消费了。RocketMQ支持我们发送延时消费的消息,即现在发送的消息先发送到Broker,但是需要过一会才能进行消费。如果需要发送延时消息,只需要通过Message的setDelayDelevel()指定一个... 查看详情

rocketmq(10)——发送延时消息(代码片段)

...Broker就会写入消息队列,消费者就可以进行消费了。RocketMQ支持我们发送延时消费的消息,即现在发送的消息先发送到Broker,但是需要过一会才能进行消费。如果需要发送延时消息,只需要通过Message的setDelayDelevel(... 查看详情

rocketmq延时消息自定义配置;topic下tag使用(代码片段)

概述使用的是开源版本的rocketmq4.9.4rocketmq也是支持延时消息的。rocketmq一般是4个部分:nameserver:保存路由信息broker:保存消息生产者:生产消息消费者:消费消息延时消息的处理是在其中的broker中。但是rocketm... 查看详情

rocketmq源码—九rocketmq延时消息(代码片段)

...式,都是通过设置消息被投递的时间来实现的,但是ApacheRocketMQ在版本4.2.0中尚不支持指定时间的 查看详情

rocketmq源码—九rocketmq延时消息(代码片段)

...式,都是通过设置消息被投递的时间来实现的,但是ApacheRocketMQ在版本4.2.0中尚不支持指定时间的 查看详情

rabbitmq实现延时队列(代码片段)

...费.RabbitMQ没有提供延迟队列功能,但是可以使用TTL+DLX来实现延迟队列效果使用场景电商平台下单后,30分钟未支付,取消订单回滚库存;新用户注册成功一周后,发送问候短信等等.延时队列实现模拟电商平台下单后,30分钟后未支付,... 查看详情

rabbitmq实现延时队列-springboot版本(代码片段)

 rabbitmq本身没有实现延时队列,但是可以通过死信队列机制,自己实现延时队列; 原理:当队列中的消息超时成为死信后,会把消息死信重新发送到配置好的交换机中,然后分发到真实的消费队列;步骤:1、创建带有时... 查看详情

golang快速接入rocketmq实现消息队列业务(代码片段)

...使用redis中的list数据类型,在大规模业务中我们可以引入rocketmq等,尤其在业务重构时需要将原有的redis消息队列实现改成rocketmq,为了保证既有业务不受影响,不引入新的BUG,是一件非常苦恼的事情;今天就介绍一个包,可以方... 查看详情

rabbitmq安装延时队列插件实现延时队列(代码片段)

下载插件地址要注意和自己的rabbitmq的版本对应起来https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases我的mq是docker安装的3.9.7的下载完之后把插件copy到mq的plugin目录下,然后启用插件。之后重启容器,我这里是docker-com... 查看详情

springbootrabbitmq集成,延时消息队列实现

...第一部分为创建普通消息队列,第二部分为延时消息队列实现: springboot提供对mq消息队列支持amqp相关包,引入即可: [html] viewplain copy <!-- rabbit mq -->    查看详情

rocketmq事务消息

RocketMQ事务消息在实现上充分利用了RocketMQ本身机制,在实现零依赖的基础上,同样实现了高性能、可扩展、全异步等一系列特性。在具体实现上,RocketMQ通过使用HalfTopic以及OperationTopic两个内部队列来存储事务消息推进状态,如... 查看详情

rocketmq(十五)延时消息(代码片段)

...时长后才可被消费处理的消息,称为延时消息。采用RocketMQ的延时消息可以实现定时任务的功能,而无需使用定时器。典型的应用场景是,电商交易中超时未支付关闭订单的场景,12306平台订票超时未支付取消订票... 查看详情