rabbitmq延时队列实现定时任务(代码片段)

xiaosongjiang xiaosongjiang     2022-12-15     509

关键词:

场景

实际业务中对于定时任务的需求是不可避免的,例如,订单超时自动取消、每天定时拉取数据等,在Node.js中系统层面提供了setTimeout、setInterval两个API或通过node-schedule这种第三方库来实现。通过这种方式实现对于简单的定时任务是ok的,过于复杂的、可用性要求较高的系统就会存在以下缺点。

存在的一些问题

  • 消耗系统内存,如果定时任务很多,长时间得不到释放,将会一直占用系统进程耗费内存。
  • 单线程如何保障出现系统崩溃后之前的定时任务不受影响?多进程集群模式下一致性的保证?
  • setTimeout、setInterval会存在时间误差,对于时间精度要求较高的是不行的。

RabbitMQ TTL+DLX 实现定时任务

RabbitMQ本身是不支持的,可以通过它提供的两个特性Time-To-Live and ExpirationDead Letter Exchanges来实现,通过以下泳道图可以看到一个消息从发布到消费的整个过程。

技术图片

死信队列

死信队列全称 Dead-Letter-Exchange 简称 DLX 是 RabbitMQ 中交换器的一种类型,消息在一段时间之后没有被消费就会变成死信被重新 publish 到另一个 DLX 交换器队列中,因此称为死信队列。
  • 死信队列产生几种情况

    • 消息被拒绝
    • 消息TTL过期
    • 队列达到最大长度
  • 设置DLX的两个参数:

    • deadLetterExchange: 设置DLX,当正常队列的消息成为死信后会被路由到DLX中
    • deadLetterRoutingKey: 设置DLX指定的路由键

注意:Dead-Letter-Exchange也是一种普通的Exchange

消息TTL

消息的TTL指的是消息的存活时间,RabbitMQ支持消息、队列两种方式设置TTL,分别如下:

  • 消息设置TTL:对消息的设置是在发送时进行TTL设置,通过x-message-ttlexpiration 字段设置,单位为毫秒,代表消息的过期时间,每条消息的TTL可不同。

  • 队列设置TTL:对队列的设置是在消息入队列时计算,通过 x-expires 设置,队列中的所有消息都有相同的过期时间,当超过了队列的超时设置,消息会自动的清除。

注意:如果以上两种方式都做了设置,消息的TTL则以两者之中最小的那个为准。

Nodejs操作RabbitMQ实现延迟队列

推荐采用 amqplib库,一个Node.js实现的RabbitMQ客户端。

初始化RabbitMQ

const amqp = require(‘amqplib‘);
const log4js = require(‘log4js‘);
const logger = log4js.getLogger();
logger.level = ‘info‘;
module.exports = 
  logger,
  init: () =>
    amqp.connect(‘amqp://122.51.9.11:5672‘).then((connection) => 
      logger.info(‘rabbitmq connect success‘);
      return connection;
    ),
;

生产者

const random = require(‘string-random‘);
const rabbitmq = require(‘./rabbitmq.js‘);
const logger = rabbitmq.logger;
const sleep = (time) => new Promise((resolve) => setTimeout(resolve, time));
async function producerDLX(connnection) 
  const testQueue = ‘testQu‘;
  const testExchange = ‘testEx‘;
  const testRoutingKey = ‘testRoutingKey‘;
  const testExchangeDLX = ‘testExDLX‘;
  const testRoutingKeyDLX = ‘testRoutingKeyDLX‘;
  const msg = ‘Producer‘;

  const ch = await connnection.createChannel();
  await ch.assertExchange(testExchange, ‘direct‘,  durable: true );
  const queueResult = await ch.assertQueue(testQueue, 
    exclusive: false,
    messageTtl: 10000,
    deadLetterExchange: testExchangeDLX,
    deadLetterRoutingKey: testRoutingKeyDLX,
  );
  await ch.bindQueue(queueResult.queue, testExchange, testRoutingKey);

  for (let i = 0; i < 5; i++) 
    await sleep(2000);
    const cMsg = `$i:$msg 消息 =>$random(10)`;
    logger.info(cMsg);
    await ch.publish(testExchange, testRoutingKey, Buffer.from(cMsg));
  

  await ch.close();


// 发送消息
rabbitmq.init().then((connection) => producerDLX(connection));

 消费者

const rabbitmq = require(‘./rabbitmq.js‘);
const logger = rabbitmq.logger;
/**
 * 消费一个死信队列
 * @param  Object  connnection
 */
async function consumerDLX(connnection) 
  const testExchangeDLX = ‘testExDLX‘;
  const testRoutingKeyDLX = ‘testRoutingKeyDLX‘;
  const testQueueDLX = ‘testQueueDLX‘;

  const ch = await connnection.createChannel();

  await ch.assertExchange(testExchangeDLX, ‘direct‘,  durable: true );
  const queueResult = await ch.assertQueue(testQueueDLX, 
    exclusive: false,
  );
  await ch.bindQueue(queueResult.queue, testExchangeDLX, testRoutingKeyDLX);
  await ch.consume(
    queueResult.queue,
    (msg) => 
      logger.info(‘consumer msg:‘, msg.content.toString());
    ,
     noAck: true 
  );


// 消费消息
rabbitmq.init().then((connection) => consumerDLX(connection));

分别执行消费者和生产者,可以看到 producer 在45秒发布了消息,consumer 是在55秒接收到的消息,实现了定时10秒种执行

技术图片

技术图片

 

 参考

https://www.jianshu.com/p/9ce0223aeb5e

基于rabbitmq实现分布式延时任务调度(代码片段)

...特殊处理(加分布式锁)避免任务被重复执行。然而使用RabbitMQ实现延时任务可以天然解决分布式环境下重复执行的问题(利用mq中消息只会被一个消费者消费这一特性可以让延时任务只会被一个消费者执行)。基于RabbitMQ做延时... 查看详情

通过rabbitmq的direct模式以及死信队列实现延时任务(代码片段)

运用RabbitMQ的DIRECT模式以及死信队列实现延时操作以及不同间隔时间后重试一、原理描述图解:一条绑定路由为【FOR_QUEUE1】的消息被发送到交换机【EXCHANGE】上RabbitTemplate.convertSendAndReceive("EXCHANGE","FOR_QUEUE1","我... 查看详情

rabbitmq延迟队列实现定时任务,这才是正确的方式!(代码片段)

场景开发中经常需要用到定时任务,对于商城来说,定时任务尤其多,比如优惠券定时过期、订单定时关闭、微信支付2小时未支付关闭订单等等,都需要用到定时任务,但是定时任务本身有一个问题。一般来说我们都是通过定... 查看详情

第203天学习打卡(rabitmq延时队列实现定时任务)

RabbitMQ延时队列(实现定时任务)TTL消息存活时间MQ:保证数据最终的一致性B站学习网址:全网最强电商教程《谷粒商城》对标阿里P6/P7,40-60万年薪_哔哩哔哩_bilibili 查看详情

springboot使用rabbitmq实现延时任务(代码片段)

...间隔一定的时间进行失败重试。本文基于springboot,使用rabbitmq_delayed_message_e 查看详情

rabbitmq实现延时队列(代码片段)

...列后不会立即被消费,可以被延迟一定的时间,再进行消费.RabbitMQ没有提供延迟队列功能,但是可以使用TTL+DLX来实现延迟队列效果使用场景电商平台下单后,30分钟未支付,取消订单回滚库存;新用户注册成功一周后,发送问候短信等... 查看详情

rabbitmq安装延时队列插件实现延时队列(代码片段)

下载插件地址要注意和自己的rabbitmq的版本对应起来https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases我的mq是docker安装的3.9.7的下载完之后把插件copy到mq的plugin目录下,然后启用插件。之后重启容器,我这里是docker-com... 查看详情

rabbitmq实现延时队列-springboot版本(代码片段)

 rabbitmq本身没有实现延时队列,但是可以通过死信队列机制,自己实现延时队列; 原理:当队列中的消息超时成为死信后,会把消息死信重新发送到配置好的交换机中,然后分发到真实的消费队列;步骤:1、创建带有时... 查看详情

千万级延时任务队列如何实现,看美图开源的-lmstfy(代码片段)

千万级延时任务队列如何实现,看美图开源的-LMSTFY导读:Task是web开发中一个经典场景,我们时常需要延时任务,或者定时任务,通常都需要任务队列。常见的任务队列如celery,lmstfy是美图开源的任务队列。本文作者详细剖析了l... 查看详情

基于rabbitmq消息延时队列方案模拟电商超时未支付订单处理场景(代码片段)

前言传统处理超时订单采取定时任务轮训数据库订单,并且批量处理。其弊端也是显而易见的;对服务器、数据库性会有很大的要求,并且当处理大量订单起来会很力不从心,而且实时性也不是特别好当然传统的手法还可以再优... 查看详情

rabbitmq队列延迟(代码片段)

 RabbitMQ队列延迟1.场景:“订单下单成功后,15分钟未支付自动取消”  1.传统处理超时订单    采取定时任务轮训数据库订单,并且批量处理。其弊端也是显而易见的;对服务器、数据库性会有很... 查看详情

rabbitmq定时任务(代码片段)

写的很棒,转载一下:Rabbitmq延迟队列实现定时任务,这才是正确的方式!-Java技术栈-博客园场景开发中经常需要用到定时任务,对于商城来说,定时任务尤其多,比如优惠券定时过期、订单定时关闭... 查看详情

每日一博-延时任务的多种实现方式解读(代码片段)

文章目录Pre延时任务VS定时任务SolutionsDB轮询核心思想DemoCode优缺点JDK的DelayQueue核心思想DemoCode优缺点时间轮算法核心思想DemoCode优缺点核心思想DemoCode优缺点Pre每日一博-使用环形队列实现高效的延时消息延时任务VS定时任务举个... 查看详情

rabbitmq利用消息超时和死信交换机实现定时任务

  在RabbitMQ的基础功能中,并没有定时任务或者延时任务这种功能,然而很多业务都有此类需求,但是我们可以依赖RabbitMQ的消息自动超时和死信交换机等基本的属性来实现这类需求,大致思路如下:  1.通过计算任务执行时... 查看详情

利用rabbitmq的死信队列实现延时消息(代码片段)

mq基本的消息模型mq死信队列的消息模型简单的说就是先弄一个正常队列,然后不要设置消费者,接着给这个正常队列绑定一个死信队列,这个死信队列设置方式和正常队列没啥区别。然后监听这个死信队列的消费.一... 查看详情

rabbitmq---延迟队列,整合springboot(代码片段)

RabbitMQ---消息队列---下半部分延迟队列概念使用场景RabbitMQ中的TTL队列设置TTL消息设置TTL两者的区别整合springbootpom文件配置文件添加Swagger配置类队列TTL代码架构代码实现延时队列优化代码架构图实现Rabbitmq插件实现延迟队列安装... 查看详情

django的celery配置(包括定时任务队列)(代码片段)

...先来安装它,在终端中输入:pipinstalldjango-celery二、安装rabbitmq,建立celery队列我做的项目用的就是rabbitmq,按道理来说,也是可以用redis作为消息队列的,但是rabbitmq更好,此处不做详细解释,有兴趣的同学的可以去研究下。ubuntu 查看详情

rabbitmq如何实现2小时未支付关闭订单?(代码片段)

...JDK延迟队列、时间轮等。因为我们项目中本身就使用到了Rabbitmq,所以基于方便开发和维护的原则,我们使用了Rabbitmq延迟队列来实现定时任务,不知道rab 查看详情