遍历定时器中所有节点,将剩余时间为 0s 的任务进行过期处理,在执行一个周期。
遍历周期:需要遍历链表中所有节点,时间复杂度 O(n),所以伴随着链表中的元素越来越多,速度也会越来越慢!
找到执行最后一个过期任务即可,无需遍历整个链表,时间复杂度 O(1),从上面的描述「有序列表定时器」的性能瓶颈在于插入时的任务排序,但是换来的就是缩短了遍历周期。
public abstract class Roulette {// 链表数据-主要用于存储每个延时任务节点 List tasks = null; // 游标指针索引 protected int index;// 时间轮轮盘的容量大小,如果是分钟级别,一般就是60个格 protected int capacity;// 时间轮轮盘的层级,如果是一级,它的上级就是二级 protected Integer level; private AtomicInteger num = new AtomicInteger(0); // 构造器 public Roulette(int capacity, Integer level) { this.capacity = capacity; this.level = level; this.tasks = new ArrayList<>(capacity); this.index = 0; } // 获取当前下表的索引对应的时间轮的任务 public TimewheelTask getTask() { return tasks.get(index); } // init初始化操作机制 public List init() { long interval = MathTool.power((capacity + 1), level); long add = 0; TimewheelTask delayTask = null; for (int i = 0; i < capacity; i++) { add += interval; if (level == 0) { delayTask = new DefaultDelayTask(level); } else { delayTask = new SplitDelayTask(level); } //已经转换为最小的时间间隔 delayTask.setDelay(add, TimeUnitProvider.getTimeUnit()); tasks.add(delayTask); } return tasks; } // 索引下标移动 public void indexAdd() { this.index++; if (this.index >= capacity) { this.index = 0; } } // 添加对应的任务到对应的队列里面 public void addTask(TimewheelTask task) { tasks.add(task); }// 给子类提供的方法进行实现对应的任务添加功能 public abstract void addTask(int interval, MyTask task);}
List tasks = null;
tasks也可以改成双向链表+ 数组的结构:即节点存贮的对象中有指针,组成环形,可以通过数组的下标灵活访问每个节点,类似 LinkedHashMap。
protected int index;
protected int capacity;
protected Integer level;
public List init() { // 那么整个时间轮的总体时间跨度(interval) long interval = MathTool.power((capacity + 1), level); long add = 0; TimewheelTask delayTask = null; for (int i = 0; i < capacity; i++) { add += interval; if (level == 0) { delayTask = new ExecuteTimewheelTask(level); } else { delayTask = new MoveTimewheelTask(level); } //已经转换为最小的时间间隔 delayTask.setDelay(add, TimeUnitProvider.getTimeUnit()); tasks.add(delayTask); } return tasks;}
例如,第一层:20 ,第二层:20^2 ......
//例如 n=7 二进制 0 1 1 1 //a的n次幂 = a的2次幂×a的2次幂 × a的1次幂×a的1次幂 ×a public static long power(long a, int n) { int rtn = 1; while (n >= 1) { if((n & 1) == 1){ rtn *= a; } a *= a; n = n >> 1; } return rtn; }
public class TimeUnitProvider { private static TimeUnit unit = TimeUnit.SECONDS; public static TimeUnit getTimeUnit() { return unit; }}
public TimewheelTask getTask() { return tasks.get(index);}
public class TimewheelBucket extends Roulette { public TimewheelBucket(int capacity, Integer level) { super(capacity, level); } public synchronized void addTask(int interval, MyTask task) { interval -= 1; int curIndex = interval + this.index; if (curIndex >= capacity) { curIndex = curIndex - capacity; } tasks.get(curIndex).addTask(task); }}
public synchronized void addTask(int interval, BizTask task) { interval -= 1; int curIndex = interval + this.index; if (curIndex >= capacity) { curIndex = curIndex - capacity; } tasks.get(curIndex).addTask(task); }
public interface Delayed extends Comparable { /** * Returns the remaining delay associated with this object, in the * given time unit. * * @param unit the time unit * @return the remaining delay; zero or negative values indicate * that the delay has already elapsed */ long getDelay(TimeUnit unit);}
@Getterpublic abstract class TimewheelTask implements Delayed { private List tasks = new ArrayList(); private int level; private Long delay; private long calDelay; private TimeUnit calUnit; public TimewheelTask(int level) { this.level = level; } public void setDelay(Long delay, TimeUnit unit) { this.calDelay=delay; this.calUnit=unit; } public void calDelay() { this.delay = TimeUnit.NANOSECONDS.convert(this.calDelay, this.calUnit) + System.nanoTime(); } public long getDelay(TimeUnit unit) { return this.delay - System.nanoTime(); } public int compareTo(Delayed o) { long d = (getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS)); return (d == 0) ? 0 : ((d < 0) ? -1 : 1); } public void addTask(BizTask task) { synchronized (this) { tasks.add(task); } } public void clear() { tasks.clear(); } public abstract void run();}
业务任务集合:private List
private int level;
private Long delay;
private long calDelay;
private TimeUnit calUnit;
public void addTask(BizTask task) { synchronized (this) { tasks.add(task); } }
public class ExecuteTimewheelTask extends TimewheelTask { public ExecuteTimewheelTask(int level) { super(level); } //到时间执行所有的任务 public void run() { List tasks = getTasks(); if (CollectionUtils.isNotEmpty(tasks)) { tasks.forEach(task -> ThreadPool.submit(task)); } }}
private static ThreadPoolExecutor executor = new ThreadPoolExecutor(20, 100, 3, TimeUnit.MINUTES, new LinkedBlockingQueue(10000), new MyThreadFactory("executor"), new ThreadPoolExecutor.CallerRunsPolicy());
public class MoveTimewheelTask extends TimewheelTask { public MoveTimewheelTask(int level) { super(level); } //跃迁到其他轮盘,将对应的任务 public void run() { List tasks = getTasks(); if (CollectionUtils.isNotEmpty(tasks)) { tasks.forEach(task -> { long delay = task.getDelay(); TimerWheel.adddTask(task,delay, TimeUnitProvider.getTimeUnit()); }); } }}
public abstract class BizTask implements Runnable { protected long interval; protected int index; protected long executeTime; public BizTask(long interval, TimeUnit unit, int index) { this.interval = interval; this.index = index; this.executeTime= TimeUnitProvider.getTimeUnit().convert(interval,unit)+TimeUnitProvider.getTimeUnit().convert(System.nanoTime(),TimeUnit.NANOSECONDS); } public long getDelay() { return this.executeTime - TimeUnitProvider.getTimeUnit().convert(System.nanoTime(), TimeUnit.NANOSECONDS); }}
public long getDelay() { return this.executeTime - TimeUnitProvider.getTimeUnit().convert(System.nanoTime(), TimeUnit.NANOSECONDS); }
public class TimerWheel { private static Map cache = new ConcurrentHashMap<>(); //一个轮表示三十秒 private static int interval = 30; private static wheelThread wheelThread; public static void adddTask(BizTask task, Long time, TimeUnit unit) { if(task == null){ return; } long intervalTime = TimeUnitProvider.getTimeUnit().convert(time, unit); if(intervalTime < 1){ ThreadPool.submit(task); return; } Integer[] wheel = getWheel(intervalTime,interval); TimewheelBucket taskList = cache.get(wheel[0]); if (taskList != null) { taskList.addTask(wheel[1], task); } else { synchronized (cache) { if (cache.get(wheel[0]) == null) { taskList = new TimewheelBucket(interval-1, wheel[0]); wheelThread.add(taskList.init()); cache.putIfAbsent(wheel[0],taskList); } } taskList.addTask(wheel[1], task); } } static{ interval = 30; wheelThread = new wheelThread(); wheelThread.setDaemon(false); wheelThread.start(); } private static Integer[] getWheel(long intervalTime,long baseInterval) { //转换后的延时时间 if (intervalTime < baseInterval) { return new Integer[]{0, Integer.valueOf(String.valueOf((intervalTime % 30)))}; } else { return getWheel(intervalTime,baseInterval,baseInterval, 1); } } private static Integer[] getWheel(long intervalTime,long baseInterval,long interval, int p) { long nextInterval = baseInterval * interval; if (intervalTime < nextInterval) { return new Integer[]{p, Integer.valueOf(String.valueOf(intervalTime / interval))}; } else { return getWheel(intervalTime,baseInterval,nextInterval, (p+1)); } } static class wheelThread extends Thread { DelayQueue queue = new DelayQueue(); public DelayQueue getQueue() { return queue; } public void add(List tasks) { if (CollectionUtils.isNotEmpty(tasks)) { tasks.forEach(task -> add(task)); } } public void add(TimewheelTask task) { task.calDelay(); queue.add(task); } @Override public void run() { while (true) { try { TimewheelTask task = queue.take(); int p = task.getLevel(); long nextInterval = MathTool.power(interval, Integer.valueOf(String.valueOf(MathTool.power(2, p)))); TimewheelBucket timewheelBucket = cache.get(p); synchronized (timewheelBucket) { timewheelBucket.indexAdd(); task.run(); task.clear(); } task.setDelay(nextInterval, TimeUnitProvider.getTimeUnit()); task.calDelay(); queue.add(task); } catch (InterruptedException e) { } } } }}
private static Map cache = new ConcurrentHashMap<>();
private static int interval = 30;
private static wheelThread wheelThread; static{ interval = 30; wheelThread = new wheelThread(); wheelThread.setDaemon(false); wheelThread.start();} static class wheelThread extends Thread { DelayQueue queue = new DelayQueue(); public DelayQueue getQueue() { return queue; } public void add(List tasks) { if (CollectionUtils.isNotEmpty(tasks)) { tasks.forEach(task -> add(task)); } } public void add(TimewheelTask task) { task.calDelay(); queue.add(task); } @Override public void run() { while (true) { try { TimewheelTask task = queue.take(); int p = task.getLevel(); long nextInterval = MathTool.power(interval, Integer.valueOf(String.valueOf(MathTool.power(2, p)))); TimewheelBucket timewheelBucket = cache.get(p); synchronized (timewheelBucket) { timewheelBucket.indexAdd(); task.run(); task.clear(); } task.setDelay(nextInterval, TimeUnitProvider.getTimeUnit()); task.calDelay(); queue.add(task); } catch (InterruptedException e) { } } }
private static Integer[] getWheel(long intervalTime,long baseInterval) { //转换后的延时时间 if (intervalTime < baseInterval) { return new Integer[]{0, Integer.valueOf(String.valueOf((intervalTime % 30)))}; } else { return getWheel(intervalTime,baseInterval,baseInterval, 1); } } private static Integer[] getWheel(long intervalTime,long baseInterval,long interval, int p) { long nextInterval = baseInterval * interval; if (intervalTime < nextInterval) { return new Integer[]{p, Integer.valueOf(String.valueOf(intervalTime / interval))}; } else { return getWheel(intervalTime,baseInterval,nextInterval, (p+1)); } }