基于rabbitmq实现分布式延时任务调度(代码片段)

yhcjhun yhcjhun     2023-05-06     354

关键词:

一.分布式延时任务

传统做法是将延时任务插入数据库,使用定时去扫描,比对任务是否到期,到期则执行并设置任务状态为完成。这种做法在分布式环境下还需要对定时扫描做特殊处理(加分布式锁)避免任务被重复执行。

然而使用RabbitMQ实现延时任务可以天然解决分布式环境下重复执行的问题(利用mq中消息只会被一个消费者消费这一特性可以让延时任务只会被一个消费者执行)。基于RabbitMQ做延时任务的核心是利用RabbitMQ的消息到期转发特性。发送消息时设置消息到期时间,等消息到期未被消费时会将消息转发到一个新的队列,新队列的消费者收到消息后再处理,利用这种时间差特性实现任务的延时触发。

二.准备RabbitMQ并设置延时任务用到的相关队列

1.安装erlang和RabbitMQ(注意erlang与RabbitMQ的版本对应关系)

2.开启rabbitmq_management

打开RabbitMQ Command Prompt输入命令:rabbitmq-plugins enable rabbitmq_management

3.创建两个Exchange

创建一个Exchange用于添加延时任务,相关配置如下

技术图片

 

 再创建一个Exchange用于接收到期的延时任务,相关配置如下

技术图片

 

4.创建两个Queue

创建第一个Queue,用于添加延时任务,相关配置如下

技术图片

 

上面配置创建了一个队列q1,设置到期消息被转移的目的地Exchange(dlx)和Route key(dlx_rk)

接下来配置q1绑定的Exchange为ExQ1,Route key为send

 技术图片

 

 再创建第二个Queue,用于接收队列q1中到期被转移的任务,相关配置如下

技术图片

 

并绑定到Exchange:dlx,Route key:dlx_rk

 技术图片

通过上面两个Exchange和两个Queue的配置,让RabbitMQ支持q1中的消息到期后转移到q2中。所以业务上我们只用将延时任务发送到q1,让任务到期触发执行的业务代码去监听(消费)q2。这样基本上就实现了分布式环境下延时任务的创建以及到期调度触发执行。

三.具体代码实现

1.创建简单maven项目,添加如下依赖

    <dependencies>
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.7.3</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.58</version>
        </dependency>
    </dependencies>

2.封装用到的RabbitMQ操作

 1 import com.rabbitmq.client.*;
 2 
 3 import java.io.IOException;
 4 import java.util.concurrent.TimeoutException;
 5 
 6 /**
 7  * 1.连接RabbitMQ
 8  * 2.添加延时任务
 9  * 3.消费延时任务
10  */
11 public class RabbitMQUtil 
12 
13     private static Connection conn;
14     private static Channel channel;
15 
16     /**
17      * 初始化RabbitMQ连接与channel
18      */
19     static 
20         ConnectionFactory factory = new ConnectionFactory();
21         factory.setUsername("guest");
22         factory.setPassword("guest");
23         factory.setVirtualHost("/");
24         factory.setHost("localhost");
25         factory.setPort(5672);
26 
27         try 
28             conn = factory.newConnection();
29             channel = conn.createChannel();
30          catch (IOException e) 
31             System.out.println("获取RabbitMQ连接失败");
32          catch (TimeoutException e) 
33             System.out.println("获取RabbitMQ连接超时");
34         
35     
36 
37 //    public static void close() throws IOException, TimeoutException 
38 //        if (Objects.nonNull(channel))
39 //            channel.close();
40 //        if (Objects.nonNull(conn))
41 //            conn.close();
42 //    
43 
44     /**
45      * 向指定exchange下route key发送延时任务
46      * @param msg 延时任务JSON bytes
47      * @param exchangeName
48      * @param routingKey
49      * @param expiration 延时时间
50      */
51     public static void addTask(byte[] msg, String exchangeName, String routingKey, int expiration) 
52         try 
53             channel.basicPublish(exchangeName, routingKey,
54                     new AMQP.BasicProperties.Builder()
55                             .expiration(String.valueOf(expiration))
56                             .build(), msg);
57          catch (IOException e) 
58             e.printStackTrace();
59         
60     
61 
62     /**
63      * 消费指定queue的消息(延时任务)
64      * @param queueName
65      * @param handler 任务处理器
66      * @param consumerTag 消费者标签(多个消费者同时消息同一queue时可以使用consumerTag作区分)
67      */
68     public static void bindConsumer(String queueName, DemoTaskHandler handler, String consumerTag) 
69         try 
70             channel.basicConsume(queueName, false, consumerTag,
71                     new DefaultConsumer(channel) 
72                         @Override
73                         public void handleDelivery(String consumerTag,
74                                                    Envelope envelope,
75                                                    AMQP.BasicProperties properties,
76                                                    byte[] body)
77                                 throws IOException 
78                             long deliveryTag = envelope.getDeliveryTag();
79                             // (process the message components here ...)
80                             handler.execute(body, consumerTag);
81                             channel.basicAck(deliveryTag, false); // 应答,告知queue成功收到消息
82                         
83                     );
84          catch (IOException e) 
85             e.printStackTrace();
86         
87     
88 
89 

3.模拟延时任务POJO

 1 import java.io.Serializable;
 2 
 3 public class DemoTask implements Serializable 
 4 
 5     private int id;
 6 
 7     public int getId() 
 8         return id;
 9     
10 
11     public void setId(int id) 
12         this.id = id;
13     
14 

4.延时任务处理器

1 import com.alibaba.fastjson.JSON;
2 
3 public class DemoTaskHandler 
4 
5     public void execute(byte[] body, String consumerTag) 
6         DemoTask task = JSON.parseObject(new String(body), DemoTask.class);
7         System.out.println(consumerTag + "收到延时任务id:" + task.getId() + " 并处理完毕");
8     
9 

5.设计一个主程序往q1队列发送延时任务

 1 import com.alibaba.fastjson.JSON;
 2 
 3 import java.util.Scanner;
 4 
 5 public class Producer 
 6 
 7     public static void main(String[] args) 
 8         // 添加延时任务
 9         System.out.println("按下键盘添加延时任务");
10         Scanner sc = new Scanner(System.in);
11         int i = 1;
12         while (sc.hasNextLine()) 
13             sc.nextLine();
14             DemoTask bo = new DemoTask();
15             bo.setId(i++);
16             RabbitMQUtil.addTask(JSON.toJSONString(bo).getBytes(),
17                     "ExQ1",
18                     "send",
19                     10000);
20             System.out.println("成功添加一个延时任务");
21         
22     
23 
24 

6.创建两个消费者(处理延时任务的业务)消费延时任务,模拟分布式环境

 1 public class Consumer1 
 2 
 3     public static void main(String[] args) 
 4         // 模拟分布式环境,处理到期的延时任务
 5         RabbitMQUtil.bindConsumer("q2",
 6                 new DemoTaskHandler(),
 7                 "consumer1");
 8 
 9     
10 
11 
1 public class Consumer2 
2 
3     public static void main(String[] args) 
4         // 模拟分布式环境,处理到期的延时任务
5         RabbitMQUtil.bindConsumer("q2",
6                 new DemoTaskHandler(),
7                 "consumer2");
8     
9 

7.运行Producer,Consumer1,Consumer2观察结果

技术图片

 

 技术图片

 

 技术图片

 

 通过观察发现,每次发送一个延时任务后,过10秒会被consumer1或者consumer2消费,以上就基本实现了分布式延时任务调度。

 

springboot使用rabbitmq实现延时任务(代码片段)

...间隔一定的时间进行失败重试。本文基于springboot,使用rabbitmq_delayed_message_e 查看详情

分布式任务调度平台xxl-job(代码片段)

支持原文:https://www.cnblogs.com/chen-chen-chen/p/12221923.html官网地址:https://www.xuxueli.com/xxl-job/发展史第一阶段单线程调度,在Java1.5之前,基于线程的等待(sleep或wait)机制定时执行,需要开发者实现调度逻辑,单... 查看详情

基于xxl-job实现分布式任务调度的实现(代码片段)

...时我们总共有80万用户左右。经测试,通过SpringTask和分布式锁,单台机器同时开启5个线程,执行时间需要27个小时左右,即便开10个线程,需要14 查看详情

rabbitmq延时队列实现定时任务(代码片段)

场景实际业务中对于定时任务的需求是不可避免的,例如,订单超时自动取消、每天定时拉取数据等,在Node.js中系统层面提供了setTimeout、setInterval两个API或通过node-schedule这种第三方库来实现。通过这种方式实现对于简单的定时... 查看详情

通过rabbitmq的direct模式以及死信队列实现延时任务(代码片段)

运用RabbitMQ的DIRECT模式以及死信队列实现延时操作以及不同间隔时间后重试一、原理描述图解:一条绑定路由为【FOR_QUEUE1】的消息被发送到交换机【EXCHANGE】上RabbitTemplate.convertSendAndReceive("EXCHANGE","FOR_QUEUE1","我... 查看详情

分布式任务调度的解决方案(代码片段)

...时任务数量日益增多,任务也变得越来越复杂,尤其是在分布式环境下,存在多个业务系统,每个业务系统都有定时任务的需求,如果都在自身系统中调度,一方面增加业务系统的复杂度,另一方面也不方便管理,因此需要有一... 查看详情

celery异步的分布式任务调度理解(代码片段)

什么是Celery呢?Celery是一个用Python开发的异步的分布式任务调度模块。Celery本身不包含消息服务,使用第三方消息服务,也就是Broker,来传递任务,目前支持的有Rebbimq,Redis,数据库以及其他的一些比如AmazonSQS,Monogdb和IronMQ。Cele... 查看详情

rabbitmq:伪延时队列(代码片段)

目录一、什么是延时队列二、RabbitMQ实现三、延时队列的问题四、解决RabbitMQ的伪延时方案ps:伪延时队列先卖个关子,我们先了解下延时队列。一、什么是延时队列所谓延时队列是指消息push到队列后,监听的消费者不能第一时间... 查看详情

redis实战(12)-基于redis的key失效和定时任务调度实现订单超时未支付自动失效(延时队列)

概述:本系列博文所涉及的相关内容来源于debug亲自录制的实战课程:缓存中间件Redis技术入门与应用场景实战(SpringBoot2.x+抢红包系统设计与实战),感兴趣的小伙伴可以点击自行前往学习(毕竟以视频的形式来掌握技术会更快... 查看详情

延时任务-基于rediszset的完整实现(代码片段)

...;《完整实现-通过DelayQueue实现延时任务》《延时任务(二)-基于netty时间轮算法实战》这两种方法都有一个缺点:都是基于单体应用的内存的方式运行延时任务的,一旦出现单点故障,可能出现延时任务数据的丢失。所... 查看详情

rabbitmq高可用实现(代码片段)

...tmq简单介绍:  基于erlang语言开发,而erlang是一门分布式语言开发,适用于集群开发;rabbitmq自身也提 查看详情

springboot定时任务-什么是elasticjob?如何集成elasticjob实现分布式任务调度?

前文展示quartz实现基于数据库的分布式任务管理和job生命周期的控制,那在分布式场景下如何解决弹性调度、资源管控、以及作业治理等呢?针对这些功能前当当团队开发了ElasticJob,2020年5月28日ElasticJob成为ApacheShardingSphere的子... 查看详情

分布式定时任务调度框架quartz学习与实战记录完整篇(代码片段)

1.Quartz的概念Quartz就是一个基于Java实现的任务调度框架,用于执行你想要执行的任何任务。Quartz是OpenSymphony开源组织在Jobscheduling(定时调度)领域的开源项目,它可以与J2EE和J2SE应用程序相结合也可以单独使用。Q... 查看详情

分布式定时任务调度框架quartz学习与实战记录完整篇(代码片段)

1.Quartz的概念Quartz就是一个基于Java实现的任务调度框架,用于执行你想要执行的任何任务。Quartz是OpenSymphony开源组织在Jobscheduling(定时调度)领域的开源项目,它可以与J2EE和J2SE应用程序相结合也可以单独使用。Q... 查看详情

celery-分布式任务调度

...后端开发工作。所以这次就介绍一款后端开发常用的一个分布式任务调度框架–Celery。介绍Celery是Python开发的分布式任务调度模块,Celery本身不含消息服务,它使用第三方消息服务来传递任务,目前,Celery支持的... 查看详情

时间片轮转算法和优先级调度算法模拟实现(代码片段)

实验三时间片轮转算法和优先级调度算法模拟实现一、 实验任务1.设计进程控制块PCB的结构,通常应包括如下信息:进程名、进程优先数(或轮转时间片数)、进程已占用的CPU时间、进程到完成还需要的时间、进程的状态、... 查看详情

xxl-job分布式任务调度框架-源码分析-任务调度执行流程及实现原理(代码片段)

文章目录1.引言2.对调度流程的思考2.1.调度中心需要解决的问题:2.2.执行器需要解决的问题:3.调度中心流程3.1.任务配置扫描流程3.2.计算任务触发时机3.2.1.已超时5秒以上3.2.2.超时未超过5秒3.2.3.还未到触发时间3.3.任务触... 查看详情

基于rabbitmq消息延时队列方案模拟电商超时未支付订单处理场景(代码片段)

前言传统处理超时订单采取定时任务轮训数据库订单,并且批量处理。其弊端也是显而易见的;对服务器、数据库性会有很大的要求,并且当处理大量订单起来会很力不从心,而且实时性也不是特别好当然传统的手法还可以再优... 查看详情