原荐简单说说kafka中的时间轮算法(代码片段)

author author     2023-01-12     152

关键词:

零、时间轮定义

简单说说时间轮吧,它是一个高效的延时队列,或者说定时器。实际上现在网上对于时间轮算法的解释很多,定义也很全,这里引用一下 朱小厮博客 里出现的定义:

参考下图,Kafka中的时间轮(TimingWheel)是一个存储定时任务的环形队列,底层采用数组实现,数组中的每个元素可以存放一个定时任务列表(TimerTaskList)。TimerTaskList是一个环形的双向链表,链表中的每一项表示的都是定时任务项(TimerTaskEntry),其中封装了真正的定时任务TimerTask。

技术分享图片

图片描述(最多50字)

如果你理解了上面的定义,那么就不必往下看了。但如果你第一次看到和我一样懵比,并且有不少疑问,那么这篇博文将带你进一步了解时间轮,甚至理解时间轮算法。

如果有兴趣,可以去看看其他的定时器 你真的了解延时队列吗 。博主认为,时间轮定时器最大的优点:

是任务的添加与移除,都是O(1)级的复杂度;
不会占用大量的资源;
只需要有一个线程去推进时间轮就可以工作了。
我们将对时间轮做层层推进的解析:

一、为什么使用环形队列

假设我们现在有一个很大的数组,专门用于存放延时任务。它的精度达到了毫秒级!那么我们的延迟任务实际上需要将定时的那个时间简单转换为毫秒即可,然后将定时任务存入其中:

比如说当前的时间是2018/10/24 19:43:45,那么就将任务存入Task[1540381425000],value则是定时任务的内容。

private Task[很长] tasks;
public List<Task> getTaskList(long timestamp)
return task.get(timestamp)

// 假装这里真的能一毫秒一个循环
public void run()
while (true)
getTaskList(System.currentTimeMillis()).后台执行()
Thread.sleep(1);


假如这个数组长度达到了亿亿级,我们确实可以这么干。 那如果将精度缩减到秒级呢?我们也需要一个百亿级长度的数组。

先不说内存够不够,显然你的定时器要这么大的内存显然很浪费。

当然如果我们自己写一个map,并保证它不存在hash冲突问题,那也是完全可行的。(我不确定我的想法是否正确,如果错误,请指出)

/ 一个精度为秒级的延时任务管理类 /
private Map<Long, Task> taskMap;
public List<Task> getTaskList(long timestamp)
return taskMap.get(timestamp - timestamp % 1000)

// 新增一个任务
public void addTask(long timestamp, Task task)
List<Task> taskList = getTaskList(timestamp - timestamp % 1000);
if (taskList == null)
taskList = new ArrayList();

taskList.add(task);

// 假装这里真的能一秒一个循环
public void run()
while (true)
getTaskList(System.currentTimeMillis()).后台执行()
Thread.sleep(1000);


其实时间轮就是一个不存在hash冲突的数据结构

抛开其他疑问,我们看看手腕上的手表(如果没有去找个钟表,或者想象一个),是不是无论当前是什么时间,总能用我们的表盘去表示它(忽略精度)

技术分享图片

图片描述(最多50字)

就拿秒表来说,它总是落在 0 - 59 秒,每走一圈,又会重新开始。

用伪代码模拟一下我们这个秒表:

private Bucket[60] buckets;// 表示60秒
public void addTask(long timestamp, Task task)
Bucket bucket = buckets[timestamp / 1000 % 60];
bucket.add(task);

public Bucket getBucket(long timestamp)
return buckets[timestamp / 1000 % 60];

// 假装这里真的能一秒一个循环
public void run()
while (true)
getBucket(System.currentTimeMillis()).后台执行()
Thread.sleep(1000);


这样,我们的时间总能落在0 - 59任意一个bucket上,就如同我们的秒钟总是落在0 - 59刻度上一样,这便是 时间轮的环形队列 。

二、表示的时间有限

但是细心的小伙伴也会发现这么一个问题:如果只能表示60秒内的定时任务应该怎么存储与取出,那是不是太有局限性了? 如果想要加入一小时后的延迟任务,该怎么办?

其实还是可以看一看钟表,对于只有三个指针的表(一般的表)来说,最大能表示12个小时,超过了12小时这个范围,时间就会产生歧义。如果我们加多几个指针呢?比如说我们有秒针,分针,时针,上下午针,天针,月针,年针...... 那不就能表示很长很长的一段时间了?而且,它并不需要占用很大的内存。

比如说秒针我们可以用一个长度为60的数组来表示,分针也同样可以用一个长度为60的数组来表示,时针可以用一个长度为24的数组来表示。那么表示一天内的所有时间,只需要三个数组即可。

动手来做吧,我们将这个数据结构称作时间轮,tickMs表示一个刻度,比如说上面说的一秒。wheelSize表示一圈有多少个刻度,即上面说的60。interval表示一圈能表示多少时间,即 tickMs * wheelSize = 60秒。

overflowWheel表示上一层的时间轮,比如说,对于秒钟来说,overflowWheel就表示分钟,以此类推。

public class TimeWheel
/ 一个时间槽的时间 */
private long tickMs;
/* 时间轮大小 /
private int wheelSize;
/
时间跨度 */
private long interval;
/ 槽 */
private Bucket[] buckets;
/* 时间轮指针 /
private long currentTimestamp;
/
上层时间轮 /
private volatile TimeWheel overflowWheel;
public TimeWheel(long tickMs, int wheelSize, long currentTimestamp)
this.currentTimestamp = currentTimestamp;
this.tickMs = tickMs;
this.wheelSize = wheelSize;
this.interval = tickMs
wheelSize;
this.buckets = new Bucket[wheelSize];
this.currentTimestamp = currentTimestamp - (currentTimestamp % tickMs);
for (int i = 0; i < wheelSize; i++)
buckets[i] = new Bucket();



将任务添加到时间轮中十分简单,对于每个时间轮来说,比如说秒级时间轮,和分级时间轮,都有它自己的过期槽。也就是delayMs < tickMs的时候。

添加延时任务的时候一共就这几种情况:

####一、时间到期

1)比如说有一个任务要在 16:29:07 执行,从秒级时间轮中来看,当我们的当前时间走到16:29:06的时候,则表示这个任务已经过期了。因为它的delayMs = 1000ms,小于了我们的秒级时间轮的tickMs(1000ms)。
比如说有一个任务要在 16:41:25 执行,从分级时间轮中来看,当我们的当前时间走到 16:41的时候( 分级时间轮没有秒针!它的最小精度是分钟(一定要理解这一点) ),则表示这个任务已经到期,因为它的delayMs = 25000ms,小于了我们的分级时间轮的tickMs(60000ms)。
二、时间未到期,且delayMs小于interval。

对于秒级时间轮来说,就是延迟时间小于60s,那么肯定能找到一个秒钟槽扔进去。

三、时间未到期,且delayMs大于interval。

对于妙级时间轮来说,就是延迟时间大于等于60s,这时候就需要借助上层时间轮的力量了,很简单的代码实现,就是拿到上层时间轮,然后类似递归一样,把它扔进去。

比如说一个有一个延时为一年后的定时任务,就会在这个递归中不断创建更上层的时间轮,直到找到满足delayMs小于interval的那个时间轮。

这里为了不把代码写的那么复杂,我们每一层时间轮的刻度都一样,也就是秒级时间轮表示60秒,上面则表示60分钟,再上面则表示60小时,再上层则表示60个60小时,再上层则表示60个60个60小时 = 216000小时。

也就是如果将最底层时间轮的tickMs(精度)设置为1000ms。wheelSize设置为60。 那么只需要5层时间轮,可表示的时间跨度已经长达24年(216000小时) 。

/**

  • 添加任务到某个时间轮
    */
    public boolean addTask(TimedTask timedTask)
    long expireTimestamp = timedTask.getExpireTimestamp();
    long delayMs = expireTimestamp - currentTimestamp;
    if (delayMs < tickMs) // 到期了
    return false;
    else
    // 扔进当前时间轮的某个槽中,只有时间【大于某个槽】,才会放进去
    if (delayMs < interval)
    int bucketIndex = (int) (((delayMs + currentTimestamp) / tickMs) % wheelSize);
    Bucket bucket = buckets[bucketIndex];
    bucket.addTask(timedTask);
    else
    // 当maybeInThisBucket大于等于wheelSize时,需要将它扔到上一层的时间轮
    TimeWheel timeWheel = getOverflowWheel();
    timeWheel.addTask(timedTask);


    return true;

    /**
  • 获取或创建一个上层时间轮
    */
    private TimeWheel getOverflowWheel()
    if (overflowWheel == null)
    synchronized (this)
    if (overflowWheel == null)
    overflowWheel = new TimeWheel(interval, wheelSize, currentTimestamp, delayQueue);



    return overflowWheel;

    当然我们的时间轮还需要一个指针的推进机制,总不能让时间永远停留在当前吧?推进的时候,同时类似递归,去推进一下上一层的时间轮。

    注意:要强调一点的是,我们这个时间轮更像是电子表,它不存在时间的中间状态,也就是精度这个概念一定要理解好。比如说,对于秒级时间轮来说,它的精度只能保证到1秒,小于1秒的,都会当成是已到期

    对于分级时间轮来说,它的精度只能保证到1分,小于1分的,都会当成是已到期

/**

  • 尝试推进一下指针
    */
    public void advanceClock(long timestamp)
    if (timestamp >= currentTimestamp + tickMs)
    currentTimestamp = timestamp - (timestamp % tickMs);
    if (overflowWheel != null)
    this.getOverflowWheel()
    .advanceClock(timestamp);



    三、对于高层时间轮来说,精度越来越不准,会不会有影响?

    上面说到,分级时间轮,精度只有分钟级,总不能延迟1秒的定时任务和延迟59秒的定时任务同时执行吧?

    有这个疑问的同学很好!实际上很好解决,只需再入时间轮即可。比如说,对于分钟级时间轮来说,delayMs为1秒和delayMs为59秒的都已经过期,我们将其取出,再扔进底层的时间轮不就可以了?

    1秒的会被扔到秒级时间轮的下一个执行槽中,而59秒的会被扔到秒级时间轮的后59个时间槽中。

    细心的同学会发现,我们的添加任务方法,返回的是一个bool

public boolean addTask(TimedTask timedTask)
再倒回去好好看看,添加到最底层时间轮失败的(我们只能直接操作最底层的时间轮,不能直接操作上层的时间轮),是不是会直接返回flase? 对于再入失败的任务,我们直接执行即可。

/**

  • 将任务添加到时间轮
    */
    public void addOrSubmitTask(TimedTask timedTask)
    if (!timeWheel.addTask(timedTask))
    taskExecutor.submit(timedTask.getTask());


    四、如何知道一个任务已经过期?

    记得我们将任务存储在槽中嘛?比如说秒级时间轮中,有60个槽,那么一共有60个槽。如果时间轮共有两层,也仅仅只有120个槽。我们只需将槽扔进一个delayedQueue之中即可。

    我们轮询地从delayedQueue取出已经过期的槽即可。(前面的所有代码,为了简单说明,并没有引入这个DelayQueue的概念,所以不用去上面翻了,并没有。博主觉得... 已经看到这里了,应该很明白这个DelayQueue的意义了。 )

    其实简单来说,实际上定时任务单单使用DelayQueue来实现,也是可以的,但是一旦任务的数量多了起来,达到了百万级,千万级,针对这个delayQueue的增删,将非常的慢。

    一、面向槽的delayQueue

    而对于时间轮来说,它只需要往delayQueue里面扔各种槽即可,比如我们的定时任务长短不一,最长的跨度到了24年,这个delayQueue也仅仅只有300个元素。

    二、处理过期的槽

    而这个槽到期后,也就是被我们从delayQueue中poll出来后,我们只需要将槽中的所有任务循环一次,重新加到新的槽中(添加失败则直接执行)即可。

/**

  • 推进一下时间轮的指针,并且将delayQueue中的任务取出来再重新扔进去
    */
    public void advanceClock(long timeout)
    try
    Bucket bucket = delayQueue.poll(timeout, TimeUnit.MILLISECONDS);
    if (bucket != null)
    timeWheel.advanceClock(bucket.getExpire());
    bucket.flush(this::addTask);

    catch (Exception e)
    e.printStackTrace();

定时任务的实现原理:时间轮算法(代码片段)

...eThreadPool,时间轮。在对比的同时,也了解了下其简单原理&#x 查看详情

实现一个简单的加权轮询算法(代码片段)

前言上一篇给大家简单介绍了一下轮询算法,这一篇就来介绍一下加权轮询算法。既然有了轮询算法的基础,那么对于加权轮询的理解就简单多了。同先举个例子看看加权轮询算法是如何运作的。假设我们的API站点有2台负载(10.0... 查看详情

定时任务的实现原理:时间轮算法(代码片段)

...eThreadPool,时间轮。在对比的同时,也了解了下其简单原理,在这里描述下我对时间轮算法实现定时任务的理解。时间轮定时使用方式@Testpublicvoidtest3()throwsInterruptedExceptionDateTimeFormatterformatter=DateTimeFormatter.ofPatter... 查看详情

解惑“高深”的kafka时间轮原理,原来也就这么回事!

...轮是Kafka实现高效的延时任务的基础,它模拟了现实生活中的钟表对时间的表示方式,同时,时间轮的方式并不仅限于Kafka,它是一种通用的时间表示方式,本文主要介绍Kafka中的时间轮原理。Kafka中存在一些定时任务(DelayedOperatio... 查看详情

如何使用netty技术设计一个百万级的消息推送系统原荐(代码片段)

先简单说下本次的主题,由于我最近做的是物联网相关的开发工作,其中就不免会遇到和设备的交互。最主要的工作就是要有一个系统来支持设备的接入、向设备推送消息;同时还得满足大量设备接入的需求。所以本次分享的内... 查看详情

.net之时间轮算法(终极版)定时任务(代码片段)

TimeWheelDemo一个基于时间轮原理的定时任务对时间轮的理解其实我是有一篇文章(.Net之时间轮算法(终极版)[1])针对时间轮的理论理解的,但是,我想,为啥我看完时间轮原理后,会采用这样的方式去实现。可能只是... 查看详情

6张图阐述kafka心跳机制(时间轮算法的具体运用)

Broker端与客户端的心跳在Kafka中非常的重要,因为一旦在一个心跳过期周期内(默认10s),Broker端的消费组组协调器(GroupCoordinator)会把消费者从消费组中移除,从而触发重平衡。在2.4.x以下其版本中,消费组一旦进入... 查看详情

算法数据结构专题「延时队列算法」史上手把手教你针对层级时间轮(timingwheel)实现延时队列的开发实战落地(下)(代码片段)

...最后,在告诉大家一下,其实时间轮的技术是来源于生活中的时钟。时间轮演示结构总览无序列表时间轮【无序列表时间轮】主要是由LinkedList链表和启动线程、终止线程实现。遍历定时器中所有节点,将剩余时间为0s的任务进行... 查看详情

卡特兰数(简单说说)(代码片段)

参考题解:【算法】震惊!!!史上最详细的卡特兰数浅谈!!!卡特兰数(好像很有用的说)介绍卡特兰数是组合数学中一种著名数列,其前几项为:1,2,5,14,42,132,429,1430,4862,16796,58786,20... 查看详情

实现一个简单的轮询算法(代码片段)

前言负载均衡,大家应该听的也不少了,也实践过N次了。当然也会知道,负载均衡是有不少算法的:随机,轮询,加权轮询,最少连接。。。。本文重点说的是轮询。先举个例子看看轮询算法是如何运作的。假设我们的API站点... 查看详情

6张图阐述kafka心跳机制(时间轮算法的具体运用)

...如下:源码解读Kafka心跳机制Kafka心跳架构设计亮点(时间轮调度算法实现原理图)温馨提示:如果大家对源码阅读不感兴趣,可以直接跳到本文的第二部分,用流程图、数据结构图阐述心跳的实现机制。1、源码分析... 查看详情

6张图阐述kafka心跳机制(时间轮算法的具体运用)

...如下:源码解读Kafka心跳机制Kafka心跳架构设计亮点(时间轮调度算法实现原理图)温馨提示:如果大家对源码阅读不感兴趣,可以直接跳到本文的第二部分,用流程图、数据结构图阐述心跳的实现机制。1、源码分析... 查看详情

算法求解思路培养-2.八皇后问题(说说递归)(代码片段)

按道理来说,递归是很简单的。例如求斐波那契数的公式,fibonaci(N)=fibonaci(N-1)+fibonaci(N-2)不复杂吧,再比方,阶乘公式:Factorial(N)=N*Factorial(N-1)有啥难的呢?但真正在面试、比赛或者工作中遇到的题目是这样的。1、八皇后问题(... 查看详情

算法之权重轮询算法(代码片段)

实现:packagetransformimport( "fmt" "strings" "sync" "project/pkg/base/log")//权重轮询调度算法//每一个服务定义typeRecvServerstruct//nolint Weightint currentWeightint RecvAddrstring//通过权重 查看详情

简单说说python与go的区别(代码片段)

背景工作中的主力语言是Python,今年要搞性能测试的工具,由于GIL锁的原因,Python的性能实在是惨淡,需要学一门性能高的语言来生成性能测试的压力端。因此我把目光放在了现在的新秀Go。经过一段时间的学习&#... 查看详情

时间轮原理及其在框架中的应用(代码片段)

在平时开发中,经常会与定时任务打交道。时间轮以其精妙的设计,可以被用来高效的处理定时任务。一、时间轮简介1.1为什么要使用时间轮在平时开发中,经常会与定时任务打交道。下面举几个定时任务处理的例子。1)心跳... 查看详情

时间轮原理及其在框架中的应用(代码片段)

作者:vivo互联网服务器团队-LiWanghong一、时间轮简介1.1为什么要使用时间轮在平时开发中,经常会与定时任务打交道。下面举几个定时任务处理的例子。1)心跳检测。在Dubbo中,需要有心跳机制来维持Consumer与Provider的长连接,... 查看详情

手写轮询算法(代码片段)

本次的手写轮询算法是跟着周阳老师完成的,这里做一下记录文章目录一、LoadBalancer接口二、LoadBalancer接口实现类三、生产者微服务消费者代码结果展示一、LoadBalancer接口packagecom.atguigu.springcloud.lb;importorg.springframework.cloud.clie... 查看详情