时间轮算法(Timing-Wheel)很早出现在linux kernel 2.6中。因效率非常高,很多应用框架都实现了这个算法。还有些定时器使用最小堆实现,但总体来说,时间轮算法在插入性能上更高。
时间轮分成多个层级,每一层是一个圈,和时钟类似
当个位的指针转完一圈到达0这个刻度之后,十位的指针转1格;当十位的转完一圈,百位的转1格,以此类推。这样就能表示很长的度数。
时间轮能表达的时间长度,和圈的数量以及每一圈的长度成正比。假设有5圈,每个圈60个刻度,每个刻度表示1毫秒,那么这个时间轮可以表示这么长:
60 x 60 x 60 x 60 x 60 = 777,600,000(ms) ~ 216小时
现在用程序表示这个指针可能这样的:
int ptr[5];
假如有一个update函数驱动时间轮运转起来,每调一次跳一格,那这个算法可能是这样的:
- ptr[0]加1,如果ptr[0]等于60,则使ptr[0]重置为0,可以写为:ptr[0] = (ptr[0] + 1) % 60
- 如果ptr[0]等于0的时候,说明转了一圈,此时ptr[1]要加1:ptr[1] = (ptr[1] + 1) % 60
- 就这样一直转到第5圈,然后又重新开始。
这样处理有点麻烦,而且还需要一个数组来表示。其实可以用一个uint32变量来划分,比如:
| 6bit | 6bit | 6bit | 6bit | 8bit |
111111 111111 111111 111111 11111111
也分5个圈,第1个占8位即256个槽位,后面4个分别占6位即64个槽位。这个变量只需不断自增就行,比如前面8位满了会变成0,后面自动进1。这样就可以定义一些宏:
// 第1个轮占的位数
#define TVR_BITS 8
// 第1个轮的长度
#define TVR_SIZE (1 << TVR_BITS)
// 第n个轮占的位数
#define TVN_BITS 6
// 第n个轮的长度
#define TVN_SIZE (1 << TVN_BITS)
// 掩码:取模或整除用
#define TVR_MASK (TVR_SIZE - 1)
#define TVN_MASK (TVN_SIZE - 1)
想取某一个圈的当前指针位置是:
// 第1个圈的当前指针位置
#define FIRST_INDEX(v) ((v) & TVR_MASK)
// 后面第n个圈的当前指针位置
#define NTH_INDEX(v, n) (((v) >> (TVR_BITS + (n) * TVN_BITS)) & TVN_MASK)
核心的数组结构是这样的:
// 第1个轮
typedef struct tvroot {
clinknode_t vec[TVR_SIZE];
} tvroot_t;
// 后面几个轮
typedef struct tvnum {
clinknode_t vec[TVN_SIZE];
} tvnum_t;
// 时间轮定时器
typedef struct timerwheel {
tvroot_t tvroot; // 第1个轮
tvnum_t tv[4]; // 后面4个轮
uint64_t lasttime; // 上一次的时间毫秒
uint32_t currtick; // 当前的tick
uint16_t interval; // 每个时间点的毫秒间隔
uint16_t remainder; // 剩余的毫秒
} timerwheel_t;
这里要重点讲一下clinknode_t,这是每一个圈的槽位的数据,它其实是一个双向循环链表:
新结点可以往head的前面加,也可以往head的后面加,相当于加到链表头和链表尾。初始情况下head的前后指针指向自己。链表中的结点就是定时器结点。
双向链表的代码如下:
#pragma once
/**
* 循环双向链表
*/
// 链表结点
typedef struct clinknode {
struct clinknode *next;
struct clinknode *prev;
} clinknode_t;
// 初始化链表头:前后都指向自己
static inline void clinklist_init(clinknode_t *head) {
head->prev = head;
head->next = head;
}
// 插入结点到链表的前面,因为是循环链表,其实是在head的后面
static inline void clinklist_add_front(clinknode_t *head, clinknode_t *node) {
node->prev = head;
node->next = head->next;
head->next->prev = node;
head->next = node;
}
// 插入结点到链表的后面,因为是循环链表,所以其实是在head的前面
static inline void clinklist_add_back(clinknode_t *head, clinknode_t *node) {
node->prev = head->prev;
node->next = head;
node->prev->next = node;
head->prev = node;
}
// 判断链表是否为空:循环链表为空是头的下一个和上一个都指向自己
static inline int clinklist_is_empty(clinknode_t *head) {
return head == head->next;
}
// 从链表中移除自己,同时会重设结点
static inline void clinklist_remote(clinknode_t *node) {
node->next->prev = node->prev;
node->prev->next = node->next;
clinklist_init(node);
}
// 将链表1的结点取出来,放到链表2
static inline void clinklist_splice(clinknode_t *head1, clinknode_t *head2) {
if (!clinklist_is_empty(head1)) {
clinknode_t *first = head1->next; // 第1个结点
clinknode_t *last = head1->prev; // 最后1个结点
clinknode_t *at = head2->next; // 插在第2个链表的这个结点前面
first->prev = head2;
head2->next = first;
last->next = at;
at->prev = last;
clinklist_init(head1);
}
}
整个定时器的代码如下:
timerwheel.h
#pragma once
/**
* 定时器模块
*/
#include <stdio.h>
#include <stddef.h>
#include <stdint.h>
#include <string.h>
#include "clinklist.h"
// 时间轮定时器
// 第1个轮占的位数
#define TVR_BITS 8
// 第1个轮的长度
#define TVR_SIZE (1 << TVR_BITS)
// 第n个轮占的位数
#define TVN_BITS 6
// 第n个轮的长度
#define TVN_SIZE (1 << TVN_BITS)
// 掩码:取模或整除用
#define TVR_MASK (TVR_SIZE - 1)
#define TVN_MASK (TVN_SIZE - 1)
// 定时器回调函数
typedef void (*timer_cb_t)(void*);
// 定时器结点
typedef struct timernode {
struct linknode *next; // 下一个结点
struct linknode *prev; // 上一个结点
void *userdata; // 用户数据
timer_cb_t callback; // 回调函数
uint32_t expire; // 到期时间
} timernode_t;
// 第1个轮
typedef struct tvroot {
clinknode_t vec[TVR_SIZE];
} tvroot_t;
// 后面几个轮
typedef struct tvnum {
clinknode_t vec[TVN_SIZE];
} tvnum_t;
// 时间轮定时器
typedef struct timerwheel {
tvroot_t tvroot; // 第1个轮
tvnum_t tv[4]; // 后面4个轮
uint64_t lasttime; // 上一次的时间毫秒
uint32_t currtick; // 当前的tick
uint16_t interval; // 每个时间点的毫秒间隔
uint16_t remainder; // 剩余的毫秒
} timerwheel_t;
// 初始化时间轮,interval为每帧的间隔,currtime为当前时间
void timerwheel_init(timerwheel_t *tw, uint16_t interval, uint64_t currtime);
// 初始化时间结点:cb为回调,ud为用户数据
void timerwheel_node_init(timernode_t *node, timer_cb_t cb, void *ud);
// 增加时间结点,ticks为触发间隔(注意是以interval为单位)
void timerwheel_add(timerwheel_t *tw, timernode_t *node, uint32_t ticks);
// 删除结点
int timerwheel_del(timerwheel_t *tw, timernode_t *node);
// 更新时间轮
void timerwheel_update(timerwheel_t *tw, uint64_t currtime);
timerwheel.c
#include "timerwheel.h"
#define FIRST_INDEX(v) ((v) & TVR_MASK)
#define NTH_INDEX(v, n) (((v) >> (TVR_BITS + (n) * TVN_BITS)) & TVN_MASK)
void timerwheel_init(timerwheel_t *tw, uint16_t interval, uint64_t currtime) {
memset(tw, 0, sizeof(*tw));
tw->interval = interval;
tw->lasttime = currtime;
int i, j;
for (i = 0; i < TVR_SIZE; ++i) {
clinklist_init(tw->tvroot.vec + i);
}
for (i = 0; i < 4; ++i) {
for (j = 0; j < TVN_SIZE; ++j)
clinklist_init(tw->tv[i].vec + j);
}
}
void timerwheel_node_init(timernode_t *node, timer_cb_t cb, void *ud) {
node->next = 0;
node->prev = 0;
node->userdata = ud;
node->callback = cb;
node->expire = 0;
}
static void _timerwheel_add(timerwheel_t *tw, timernode_t *node) {
uint32_t expire = node->expire;
uint32_t idx = expire - tw->currtick;
clinknode_t *head;
if (idx < TVR_SIZE) {
head = tw->tvroot.vec + FIRST_INDEX(expire);
} else {
int i;
uint64_t sz;
for (i = 0; i < 4; ++i) {
sz = (1ULL << (TVR_BITS + (i+1) * TVN_BITS));
if (idx < sz) {
idx = NTH_INDEX(expire, i);
head = tw->tv[i].vec + idx;
break;
}
}
}
clinklist_add_back(head, (clinknode_t*)node);
}
void timerwheel_add(timerwheel_t *tw, timernode_t *node, uint32_t ticks) {
node->expire = tw->currtick + ((ticks > 0) ? ticks : 1);
_timerwheel_add(tw, node);
}
int timerwheel_del(timerwheel_t *tw, timernode_t *node) {
if (!clinklist_is_empty((clinknode_t*)node)) {
clinklist_remote((clinknode_t*)node);
return 1;
}
return 0;
}
void _timerwheel_cascade(timerwheel_t *tw, tvnum_t *tv, int idx) {
clinknode_t head;
clinklist_init(&head);
clinklist_splice(tv->vec + idx, &head);
while (!clinklist_is_empty(&head)) {
timernode_t *node = (timernode_t*)head.next;
clinklist_remote(head.next);
_timerwheel_add(tw, node);
}
}
void _timerwheel_tick(timerwheel_t *tw) {
++tw->currtick;
uint32_t currtick = tw->currtick;
int index = (currtick & TVR_MASK);
if (index == 0) {
int i = 0;
int idx;
do {
idx = NTH_INDEX(tw->currtick, i);
_timerwheel_cascade(tw, &(tw->tv[i]), idx);
} while (idx == 0 && ++i < 4);
}
clinknode_t head;
clinklist_init(&head);
clinklist_splice(tw->tvroot.vec + index, &head);
while (!clinklist_is_empty(&head)) {
timernode_t *node = (timernode_t*)head.next;
clinklist_remote(head.next);
if (node->callback) {
node->callback(node->userdata);
}
}
}
void timerwheel_update(timerwheel_t *tw, uint64_t currtime) {
if (currtime > tw->lasttime) {
int diff = currtime - tw->lasttime + tw->remainder;
int intv = tw->interval;
tw->lasttime = currtime;
while (diff >= intv) {
diff -= intv;
_timerwheel_tick(tw);
}
tw->remainder = diff;
}
}
核心函数就两个:
- _timerwheel_add 把定时器结点加到时间轮里。
- _timerwheel_tick 指针往前走一步,如果指针到达0,则上一个圈的指针也会跳一步,此时要把上一个圈的链表取出来,重新加到当前圈里(_timerwheel_cascade)
这份代码参考了2.6内核的timer,其中涉及到一些位运算,不过通过上面的讲解,应该不难理解。有兴趣了解的还是看代码吧,代码才是最好的文档:)
本文摘自 :https://blog.51cto.com/u