rabbitmq:伪延时队列(代码片段)

kelin- kelin-     2022-12-22     342

关键词:

目录

ps:伪延时队列先卖个关子,我们先了解下延时队列。

一、什么是延时队列

 所谓延时队列是指消息push到队列后,监听的消费者不能第一时间获取消息,需要等到指定时间才能消费。
 一般在业务里面需要对某些消息做定时发送,不想走定时任务或者是用户下单之后多长时间自动失效类似的场景可以考虑通过延时队列实现。

二、RabbitMQ实现

MQ本身并不支持直接的延时队列实现,但是我们可以通过RabbitMQ的消息TTL和Dead Letter规则来实现

  1. Time TO Live (TTL):
    RabbitMQ可以针对Queue设置x-expires 或者 针对Message设置 x-message-ttl,来控制消息的生存时间

  2. Dead Letter 死信
    RabbitMQ官网这样定义死信消息:
  • . 消息被拒绝(basic.reject或basic.nack)并且requeue=false.
  • . 消息TTL过期
  • 队列达到最大长度(队列满了,无法再添加数据到mq中)
  1. Dead Letter Exchanges(DLX)死信交换机
    MQ默认的死信消息是丢弃的,但是我们可以通过设置以下两个属性让死信消息转发到我们指定的队列。
  • x-dead-letter-exchange:出现dead letter之后将dead letter重新发送到指定exchange
  • x-dead-letter-routing-key:出现dead letter之后将dead letter重新按照指定的routing-key发送
  1. 延时队列实现:
    了解了MQ队列的TTL和Dead Letter之后,我们就可以通过这两个特性来实现,首先我们通过设置消息或者队列的TTL来设置消息在指定时间后成为死信,再设置死信消息的路由转发规则到特定队列,消费者通过监听这个特定队列就能实现延时队列的效果。

  2. 代码实现

生产者发送消息:ttlQueue存放过期时间的队列,deadLetterQueue死信转发队列,seconds是过期时间

 public static void sendTTLMsg(String ttlQueue, String deadLetterQueue, Object msg, Integer seconds) 
        MqSender.getInstance().setHost(RABBIT_MQ_HOST);
        // 获取到连接以及MQ通道
        Connection connection;
        try 
            connection = MqSender.getInstance().newConnection();
            // 从连接中创建通道
            Channel channel = connection.createChannel();
            // 配置
            Map<String, Object> args = new HashMap<String, Object>();
            args.put("x-dead-letter-exchange", "");
            args.put("x-dead-letter-routing-key", deadLetterQueue);
            channel.queueDeclare(deadLetterQueue, true, false, false, null);
            channel.queueDeclare(ttlQueue, true, false, false, args);
            // 发送消息
            channel.basicPublish("", ttlQueue, new AMQP.BasicProperties.Builder().expiration(String.valueOf(seconds)).build(), MAPPER.writeValueAsBytes(msg));
            channel.close();
            connection.close();
         catch (IOException e) 
            e.printStackTrace();
        
    

消费者通过监听deadLetterQueue来实现延时消息监听

三、 延时队列的问题

通过我们测试发现,这种方式实现的延时队列,在队列设置TTL的情况下是可以正常的,但是如果根据消息设置了不同的TTL,就会有问题,因为MQ本质上还是消息队列中间件,队列是遵循先进先出的,如果有两个消息先后入队,但是后入队的消息TTL小于前面的消息,它必须等待之前的消息被消费完后才能挪到队列头部,这样不同延时消息就会出现问题。

通过RabbitMQ官网的文档也介绍了这个问题:

 Only when expired messages reach the head of a queue will they actually be discarded (or dead-lettered)

所以我才称之为MQ的伪延时队列,这种延时队列在消息TTL不同的情况下并不能实现真正的延时消费。

四、解决RabbitMQ的伪延时方案

既然RabbitMQ无法支持不同TTL消息的延时消费,那么如果我们要实现这种功能,有什么方案呢,在实际业务开发中,我们有这样的解决方案:

首先我们会创建多级延时消费队列(比如两分钟,三十分钟,一天三种,具体可以根据业务量和访问量还有时间精确度来划分,这里的两分钟、三十分钟是指队列统一的TTL),push消费队列的时候,会根据需要延时的时间,丢到不同的消费队列,比如小于三十分钟的我们push到两分钟队列,三十分钟到一天的放入三十分钟队列,超过一天的放入一天队列,在死信队列的监听器做同样的判断,如果是小于等于当前时间消息的,立马消费,否则按照上述规则继续循环到不同的延时队列

这种方案解决了多级延时消费的问题,并且能够较大程度地避免了消息的重复循环,降低MQ的压力,但是缺点也比较明显,因为最低是两分钟的延时,理论上来说最多会有两分钟的误差,如果对时间要求性比较高的,可以适当调低最低一级别的延时TTL,比如一分钟或者三十秒

类似代码如下:cts是需要消费掉的时间戳

 long now = System.currentTimeMillis();
        long cts = Long.valueOf(feedComment.getCts());
        if (cts - now <= 30 * 60 * 1000) 
            MqSender.sendTTLMsg(MqConstants.FEED_COMMENT_DELAY_QUEUE_2MIN, MqConstants.FEED_COMMENT_AUTO_POST_QUEUE, feedComment, 2 * 60);
         else if (cts - now <= 24 * 60 * 60 * 1000) 
            MqSender.sendTTLMsg(MqConstants.FEED_COMMENT_DELAY_QUEUE_30MIN, MqConstants.FEED_COMMENT_AUTO_POST_QUEUE, feedComment, 30 * 60);
         else 
            MqSender.sendTTLMsg(MqConstants.FEED_COMMENT_DELAY_QUEUE_24HOUR, MqConstants.FEED_COMMENT_AUTO_POST_QUEUE, feedComment, 24 * 60 * 60);
        




rabbitmq安装延时队列插件实现延时队列(代码片段)

下载插件地址要注意和自己的rabbitmq的版本对应起来https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases我的mq是docker安装的3.9.7的下载完之后把插件copy到mq的plugin目录下,然后启用插件。之后重启容器,我这里是docker-com... 查看详情

rabbitmq实现延时队列(代码片段)

...列后不会立即被消费,可以被延迟一定的时间,再进行消费.RabbitMQ没有提供延迟队列功能,但是可以使用TTL+DLX来实现延迟队列效果使用场景电商平台下单后,30分钟未支付,取消订单回滚库存;新用户注册成功一周后,发送问候短信等... 查看详情

rabbitmq实现延时队列-springboot版本(代码片段)

 rabbitmq本身没有实现延时队列,但是可以通过死信队列机制,自己实现延时队列; 原理:当队列中的消息超时成为死信后,会把消息死信重新发送到配置好的交换机中,然后分发到真实的消费队列;步骤:1、创建带有时... 查看详情

08-rabbitmq-springboot-延时队列(代码片段)

一、springBoot整合RabbitMQ1、IDEA创建一个SpringBoot的项目2、导入相关的依赖<!--导入依赖--><dependencies><!--RabbitMQ依赖--><dependency><groupId>org.springframework.boot</groupId><artifactI 查看详情

rabbitmq学习--延迟队列的学习(代码片段)

...队列就是用来存放需要在指定时间被处理的元素的队列。RabbitMQ中的延迟队列TTL是什么?TTL是RabbitMQ中一个消息或者队列的属性,表明一条消息或者 查看详情

rabbitmq学习--延迟队列的学习(代码片段)

...队列就是用来存放需要在指定时间被处理的元素的队列。RabbitMQ中的延迟队列TTL是什么?TTL是RabbitMQ中一个消息或者队列的属性,表明一条消息或者 查看详情

springcloudgradle父子项目微服务框架搭建---rabbitmq延时队列(代码片段)

总目录https://preparedata.blog.csdn.net/article/details/120062997文章目录总目录一、rabbit延时插件下载二、rabbit插件安装三、项目中配置延时队列四、定义消息通道五、生成消息六、监听消息,进行消费延时队列的配置是对上片文章的... 查看详情

利用rabbitmq的死信队列实现延时消息(代码片段)

mq基本的消息模型mq死信队列的消息模型简单的说就是先弄一个正常队列,然后不要设置消费者,接着给这个正常队列绑定一个死信队列,这个死信队列设置方式和正常队列没啥区别。然后监听这个死信队列的消费.一... 查看详情

rabbitmq---延迟队列,整合springboot(代码片段)

RabbitMQ---消息队列---下半部分延迟队列概念使用场景RabbitMQ中的TTL队列设置TTL消息设置TTL两者的区别整合springbootpom文件配置文件添加Swagger配置类队列TTL代码架构代码实现延时队列优化代码架构图实现Rabbitmq插件实现延迟队列安装... 查看详情

rabbitmq延迟队列(代码片段)

延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队... 查看详情

通过rabbitmq的direct模式以及死信队列实现延时任务(代码片段)

运用RabbitMQ的DIRECT模式以及死信队列实现延时操作以及不同间隔时间后重试一、原理描述图解:一条绑定路由为【FOR_QUEUE1】的消息被发送到交换机【EXCHANGE】上RabbitTemplate.convertSendAndReceive("EXCHANGE","FOR_QUEUE1","我... 查看详情

rabbitmq安装部署(new)&延时队列使用(代码片段)

RabbitMQ安装部署(New)elang环境与MQ版本一定要对应,否则无法启动,Rabbit版本与插件版本一定要对应,负责无法加载插件版本信息centos7.3.0erlangVersion:23.0.2,Release:2.el7RabbitMQ3.8.0-1rabbitmq_delayed_message_ 查看详情

rabbitmq延迟队列(代码片段)

一、说明在上一篇中,介绍了RabbitMQ中的死信队列是什么,何时使用以及如何使用RabbitMQ的死信队列。相信通过上一篇的学习,对于死信队列已经有了更多的了解,这一篇的内容也跟死信队列息息相关,如果你... 查看详情

rabbitmq延时队列实现定时任务(代码片段)

场景实际业务中对于定时任务的需求是不可避免的,例如,订单超时自动取消、每天定时拉取数据等,在Node.js中系统层面提供了setTimeout、setInterval两个API或通过node-schedule这种第三方库来实现。通过这种方式实现对于简单的定时... 查看详情

rabbitmq的延时重试队列(代码片段)

​1.背景通过上文学习知道了死信队列,如果只是网络抖动,出现异常那么直接进入死信队列,那么是不合理的。这就可以使用延时重试队列,本文将介绍如何实现延时重试队列。2.原理图是俺在网上找的,请... 查看详情

rabbitmq——延迟队列的概念理解及应用举例(代码片段)

1.延迟队列——概念理解延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间... 查看详情

rabbitmq的延时队列的使用(代码片段)

配置:spring:rabbitmq:addresses:192.168.108.128:5672connection-timeout:15000username:guestpassword:guestpublisher-confirms:truepublisher-returns:true依赖:<!--rabbitmq--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp<... 查看详情

rabbitmq的死信队列和延时队列(代码片段)

RabbitMQ死信队列和延时队列​RabbitMQ本身是具有死信队列和死信交换机属性的,延时队列可以通过死信队列和死信交换机来实现。在电商行业中,通常都会有一个需求:订单超时未支付,自动取消该订单。那么通过... 查看详情