分布式延时任务解决方案

wangdaijun wangdaijun     2022-12-23     196

关键词:

在开发中,往往会遇到一些关于延时任务的需求。例如


  • 生成订单30分钟未支付,则自动取消

  • 生成订单60秒后,给用户发短信


对上述的任务,我们给一个专业的名字来形容,那就是延时任务。那么这里就会产生一个问题,这个延时任务和定时任务的区别究竟在哪里呢?一共有如下几点区别


  1. 定时任务有明确的触发时间,延时任务没有

  2. 定时任务有执行周期,而延时任务在某事件触发后一段时间内执行,没有执行周期

  3. 定时任务一般执行的是批处理操作是多个任务,而延时任务一般是单个任务


下面,我们以判断订单是否超时为例,进行方案分析

 

redis缓存


- 思路一


利用redis的zset,zset是一个有序集合,每一个元素(member)都关联了一个score,通过score排序来取集合中的值


添加元素:ZADD key score member [[score member] [score member] …]

按顺序查询元素:ZRANGE key start stop [WITHSCORES]

查询元素score:ZSCORE key member

移除元素:ZREM key member [member …]


测试如下


# 添加单个元素

 

redisZADD page_rank 10 google.com

(integer) 1

 

 

# 添加多个元素

 

redisZADD page_rank 9 baidu.com 8 bing.com

(integer) 2

 

redisZRANGE page_rank 0 -1 WITHSCORES

1) "bing.com"

2) "8"

3) "baidu.com"

4) "9"

5) "google.com"

6) "10"

 

# 查询元素的score值

redisZSCORE page_rank bing.com

"8"

 

# 移除单个元素

 

redisZREM page_rank google.com

(integer) 1

 

redisZRANGE page_rank 0 -1 WITHSCORES

1) "bing.com"

2) "8"

3) "baidu.com"

4) "9"


那么如何实现呢?我们将订单超时时间戳与订单号分别设置为score和member,系统扫描第一个元素判断是否超时,具体如下图所示


技术分享图片


实现一


package com.rjzheng.delay4;

 

import java.util.Calendar;

import java.util.Set;

 

import redis.clients.jedis.Jedis;

import redis.clients.jedis.JedisPool;

import redis.clients.jedis.Tuple;

 

public class AppTest 

    private static final String ADDR = "127.0.0.1";

    private static final int PORT = 6379;

    private static JedisPool jedisPool = new JedisPool(ADDR, PORT);

    

    public static Jedis getJedis() 

       return jedisPool.getResource();

    

    

    //生产者,生成5个订单放进去

    public void productionDelayMessage()

        for(int i=0;i<5;i++)

            //延迟3秒

            Calendar cal1 = Calendar.getInstance();

            cal1.add(Calendar.SECOND, 3);

            int second3later = (int) (cal1.getTimeInMillis() / 1000);

            AppTest.getJedis().zadd("OrderId", second3later,"OID0000001"+i);

            System.out.println(System.currentTimeMillis()+"ms:redis生成了一个订单任务:订单ID为"+"OID0000001"+i);

        

    

    

    //消费者,取订单

    public void consumerDelayMessage()

        Jedis jedis = AppTest.getJedis();

        while(true)

            Set<Tupleitems = jedis.zrangeWithScores("OrderId", 0, 1);

            if(items == null || items.isEmpty())

                System.out.println("当前没有等待的任务");

                try 

                    Thread.sleep(500);

                 catch (InterruptedException e) 

                    // TODO Auto-generated catch block

                    e.printStackTrace();

                

                continue;

            

            int  score = (int) ((Tuple)items.toArray()[0]).getScore();

            Calendar cal = Calendar.getInstance();

            int nowSecond = (int) (cal.getTimeInMillis() / 1000);

            if(nowSecond >= score)

                String orderId = ((Tuple)items.toArray()[0]).getElement();

                jedis.zrem("OrderId", orderId);

                System.out.println(System.currentTimeMillis() +"ms:redis消费了一个任务:消费的订单OrderId为"+orderId);

            

        

    

    

    public static void main(String[] args) 

        AppTest appTest =new AppTest();

        appTest.productionDelayMessage();

        appTest.consumerDelayMessage();

    

    


此时对应输出如下


技术分享图片


可以看到,几乎都是3秒之后,消费订单。


然而,这一版存在一个致命的硬伤,在高并发条件下,多消费者会取到同一个订单号,我们上测试代码ThreadTest


package com.rjzheng.delay4;

 

import java.util.concurrent.CountDownLatch;

 

public class ThreadTest 

    private static final int threadNum = 10;

    private static CountDownLatch cdl = new CountDownLatch(threadNum);

    static class DelayMessage implements Runnable

        public void run() 

            try 

                cdl.await();

             catch (InterruptedException e) 

                // TODO Auto-generated catch block

                e.printStackTrace();

            

            AppTest appTest =new AppTest();

            appTest.consumerDelayMessage();

        

    

    public static void main(String[] args) 

        AppTest appTest =new AppTest();

        appTest.productionDelayMessage();

        for(int i=0;i<threadNum;i++)

            new Thread(new DelayMessage()).start();

            cdl.countDown();

        

    


输出如下所示


技术分享图片


显然,出现了多个线程消费同一个资源的情况。


解决方案


(1)用分布式锁,但是用分布式锁,性能下降了,该方案不细说。


(2)对ZREM的返回值进行判断,只有大于0的时候,才消费数据,于是将consumerDelayMessage()方法里的


if(nowSecond >= score)

    String orderId = ((Tuple)items.toArray()[0]).getElement();

    jedis.zrem("OrderId", orderId);

    System.out.println(System.currentTimeMillis()+"ms:redis消费了一个任务:消费的订单OrderId为"+orderId);


修改为


if(nowSecond >= score)

    String orderId = ((Tuple)items.toArray()[0]).getElement();

    Long num = jedis.zrem("OrderId", orderId);

    if( num != null && num>0)

        System.out.println(System.currentTimeMillis()+"ms:redis消费了一个任务:消费的订单OrderId为"+orderId);

    


在这种修改后,重新运行ThreadTest类,发现输出正常了


- 思路二


该方案使用redis的Keyspace Notifications,中文翻译就是键空间机制,就是利用该机制可以在key失效之后,提供一个回调,实际上是redis会给客户端发送一个消息。是需要redis版本2.8以上。


实现二


在redis.conf中,加入一条配置


notify-keyspace-events Ex


运行代码如下


package com.rjzheng.delay5;

 

import redis.clients.jedis.Jedis;

import redis.clients.jedis.JedisPool;

import redis.clients.jedis.JedisPubSub;

 

public class RedisTest 

    private static final String ADDR = "127.0.0.1";

    private static final int PORT = 6379;

    private static JedisPool jedis = new JedisPool(ADDR, PORT);

    private static RedisSub sub = new RedisSub();

 

    public static void init() 

        new Thread(new Runnable() 

            public void run() 

                jedis.getResource().subscribe(sub, "[email protected]__:expired");

            

        ).start();

    

 

    public static void main(String[] args) throws InterruptedException 

        init();

        for(int i =0;i<10;i++)

            String orderId = "OID000000"+i;

            jedis.getResource().setex(orderId, 3, orderId);

            System.out.println(System.currentTimeMillis()+"ms:"+orderId+"订单生成");

        

    

    

    static class RedisSub extends JedisPubSub 

        <a href=‘http://www.jobbole.com/members/wx610506454‘>@Override</a>

        public void onMessage(String channel, String message) 

            System.out.println(System.currentTimeMillis()+"ms:"+message+"订单取消");

        

    


输出如下


技术分享图片


可以明显看到3秒过后,订单取消了


ps:redis的pub/sub机制存在一个硬伤,官网内容如下


原:Because Redis Pub/Sub is fire and forget currently there is no way to use this feature if your application demands reliable notification of events, that is, if your Pub/Sub client disconnects, and reconnects later, all the events delivered during the time the client was disconnected are lost.


翻: Redis的发布/订阅目前是即发即弃(fire and forget)模式的,因此无法实现事件的可靠通知。也就是说,如果发布/订阅的客户端断链之后又重连,则在客户端断链期间的所有事件都丢失了。
因此,方案二不是太推荐。当然,如果你对可靠性要求不高,可以使用。


优缺点


优点:(1)由于使用Redis作为消息通道,消息都存储在Redis中。如果发送程序或者任务处理程序挂了,重启之后,还有重新处理数据的可能性。
(2)做集群扩展相当方便
(3)时间准确度高

缺点:(1)需要额外进行redis维护


(5)使用消息队列


我们可以采用rabbitMQ的延时队列。RabbitMQ具有以下两个特性,可以实现延迟队列


  • RabbitMQ可以针对Queue和Message设置 x-message-tt,来控制消息的生存时间,如果超时,则消息变为dead letter

  • lRabbitMQ的Queue可以配置x-dead-letter-exchange 和x-dead-letter-routing-key(可选)两个参数,用来控制队列内出现了deadletter,则按照这两个参数重新路由。
    结合以上两个特性,就可以模拟出延迟消息的功能,具体的,我改天再写一篇文章,这里再讲下去,篇幅太长。


优缺点


优点: 高效,可以利用rabbitmq的分布式特性轻易的进行横向扩展,消息支持持久化增加了可靠性。


缺点:本身的易用度要依赖于rabbitMq的运维.因为要引用rabbitMq,所以复杂度和成本变高

 

 

原文链接: https://mp.weixin.qq.com/s/4RMT427vnsRezfV_s7RVGA 

订单30分钟未支付自动取消怎么实现?(代码片段)

...、JVM、数据库、Redis、Spring、Mybatis、SpringMVC、SpringBoot、分布式、微服务、设计模式、架构、校招社招分享等核心知识点,欢迎star~Github地址:https://github.com/Tyson0314/Java-learning目录了解需求方案1:数据库轮询方案2:... 查看详情

celery异步,延时任务,周期任务(代码片段)

  celery中文译为芹菜,是一个分布式任务队列. 是异步的,所以能处理大量消息  最新的celery不支持windows下使用了,所以在使用pycharm安装celery模块之后,需要再安装eventlet模块才能测试运行.一.异步任务启动客户端:s1,s2要在项... 查看详情

延时任务实现方案总结(代码片段)

#引言在开发中,往往会遇到一些关于延时任务的需求。例如生成订单30分钟未支付,则自动取消生成订单60秒后,给用户发短信对上述的任务,我们给一个专业的名字来形容,那就是延时任务。那么这里就会产生一... 查看详情

延时任务实现方案总结(代码片段)

#引言在开发中,往往会遇到一些关于延时任务的需求。例如生成订单30分钟未支付,则自动取消生成订单60秒后,给用户发短信对上述的任务,我们给一个专业的名字来形容,那就是延时任务。那么这里就会产生一... 查看详情

1记录一个分布式延时订单收货自动处理方案(代码片段)

1、场景分布式订单服务启动了多个,里面有一个自动检查订单是否超期,然后自动收货的功能,在里面有一个调用加积分的功能,所以如果不采用分布式锁就会执行多次2、采用redisson解决这个问题<dependency><groupId>org.redi... 查看详情

采用简易的环形延时队列处理秒级定时任务的解决方案

 业务背景在稍微复杂点业务系统中,不可避免会碰到做定时任务的需求,比如淘宝的交易超时自动关闭订单、超时自动确认收货等等。对于一些定时作业比较多的系统,通常都会搭建专门的调度平台来管理,通过创建定时器... 查看详情

分布式定时任务原理以及解决方案-指定时间执行定时任务

分布式定时任务原理以及实现一、单机指定时间执行定时任务实现方式Timer运行机制ScheduledThreadPoolExecutor的运行机制原理图Leader/Follower模式Timer和ScheduledThreadPoolExucutor区别Timer是基于绝对时间,ScheduledThreadPoolExucutor基于相对时间Tim... 查看详情

java延时队列delayqueue(代码片段)

...的功能,当然这种方式只适用于单体架构,如果是集群或分布式,需要自行加分布式锁,更好的方式是利用redis或其他中间件如mq完 查看详情

分布式任务调度的解决方案(代码片段)

简介随着系统规模的发展,定时任务数量日益增多,任务也变得越来越复杂,尤其是在分布式环境下,存在多个业务系统,每个业务系统都有定时任务的需求,如果都在自身系统中调度,一方面增加业务系统的复杂度,另一方面... 查看详情

分布式任务调度平台xxl-job(代码片段)

支持原文:https://www.cnblogs.com/chen-chen-chen/p/12221923.html官网地址:https://www.xuxueli.com/xxl-job/发展史第一阶段单线程调度,在Java1.5之前,基于线程的等待(sleep或wait)机制定时执行,需要开发者实现调度逻辑,单... 查看详情

分布式任务调度的解决方案(代码片段)

1.简介随着系统规模的发展,定时任务数量日益增多,任务也变得越来越复杂,尤其是在分布式环境下,存在多个业务系统,每个业务系统都有定时任务的需求,如果都在自身系统中调度,一方面增加业务系统的复杂度,另一方... 查看详情

一口气说出6种实现延时消息的方案,还有谁不会?!(代码片段)

...n/post/6844904150703013901延时消息(定时消息)指的在分布式异步消息场景下,生产端发送一条消息,希望在指定延时或者指定时间点被消费端消费到,而不是立刻被消费。延时消息适用的业务场景非常的广泛࿰... 查看详情

celery时区设置问题源码探究

...定时间未能执行的情况。Google、百度了一些网友的分析及解决方案,大多认为是Celery时区设置导致的问题。然而这些解答大多类似,而且并不能解决我心中的疑惑,因此决定研究源码一探究竟。这里网上大多数解答存在问题的地... 查看详情

celery介绍(代码片段)

...ueue—Celery5.3.0b1documentationCelery官方文档中文版:Celery-分布式任务队列—Celery3.1.7文档Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统1.完成异步任务:可以提高项目的并发量,以前用多线程实现项目... 查看详情

分布式任务框架选型

...个分布式锁,只有获得锁的才能执行任务。   解决方案:redis,zookeeper,DB运维工具。日志、监控、任务配置等高可用性。保证任务能够执行,且不重复跑。用途就是在分布式环境使用,可以轻松实现。第1、3点不复杂... 查看详情

延时任务-基于rediszset的完整实现(代码片段)

所谓的延时任务给大家举个例子:你买了一张火车票,必须在30分钟之内付款,否则该订单被自动取消。「订单30分钟不付款自动取消,这个任务就是一个延时任务。」我之前已经写过2篇关于延时任务的文章:... 查看详情

简单粗暴的分布式定时任务解决方案(代码片段)

...外,那么怎么保证这个任务只执行一次呢?其实解决方案有很多。分布式任务执行出现的问题,如下图所示:使用数据库唯一约束加锁使用redis的setNX命令使用分布式框架Quartz,TBSchedule,elastic-job,Saturn... 查看详情

订单30分钟未支付自动取消怎么实现?

...用消息队列了解需求在开发中,往往会遇到一些关于延时任务的需求。例如生成订单30分钟未支付,则自动取消生成订单60秒后,给用户发短信对上述的任务,我们给一个专业的名字来形容,那就是延时任务。那么... 查看详情