背景:项目需要使用定时任务实现每隔N天定时重启一批设备,但由于硬件及现场资源限制,无法同时重启所有设备,需要每隔指定时间间隔重启其中几个设备(相当于实现一个任务分片的效果)。
基于以上需求,开始设计后端实现方案,最开始想到的就是队列+timer的方案,任务都存到队列中,然后通过一个定时器每隔指定时间间隔从队列中获取指定数量的任务进行执行,思前想后总是觉得有点麻烦,于是又苦思冥想有没有更好的方案,在了解了时间轮算法的特点后觉得完美契合需求。
1.时间轮算法概述
关于时间轮算法的概述可参考时间轮算法(TimingWheel)-CSDN博客
2.实现篇
2.1定义时间轮槽位(TimeSlot)
每个槽位中存储一个任务列表,在指定时刻任务列表中的任务会被取出执行。
import java.util.ArrayList;
import java.util.List;
public class TimeSlot {
private List<Runnable> tasks = new ArrayList<>();
public void addTask(Runnable task) {
tasks.add(task);
}
public List<Runnable> getTasks() {
return tasks;
}
public void clearTasks() {
tasks.clear();
}
}
2.2 定义时间轮(TimeWheel)
时间轮由一定数量的槽位组成,并负责任务的调度和执行:
public class TimeWheel {
/**
* 总认为个数
*/
private final int totalTaskCount;
/**
* 已执行任务的个数
*/
private int executedTaskCount = 0;
/**
* 每隔刻度的时间间隔,单位为毫秒
*/
private final int tickDuration;
/**
* 时间轮的大小,即槽位数量
*/
private final int tickDuration;
/**
* 时间轮的槽位数组
*/
private final int tickDuration;
/**
* 当前刻度位置
*/
private int currentTick = 0;
/**
* 执行任务的线程池
*/
private final ScheduledExecutorService scheduler;
public TimeWheel(int totalTaskCount, int tickDuration, int wheelSize) {
this.totalTaskCount = totalTaskCount;
this.tickDuration = tickDuration;
this.wheelSize = wheelSize;
this.slots = new TimeSlot[wheelSize];
for (int i = 0; i < wheelSize; i++) {
slots[i] = new TimeSlot();
}
scheduler = Executors.newSingleThreadScheduledExecutor();
start();
}
private void start() {
// initialDelay设置为500ms
scheduler.scheduleAtFixedRate(this::tick, 500, tickDuration, TimeUnit.MILLISECONDS);
}
private void tick() {
// 获取当前刻度的槽位
TimeSlot slot = slots[currentTick];
// 执行槽位中的所有任务
for (Runnable task : slot.getTasks()) {
// 任务执行
task.run();
// 如果已执行任务数等于总任务数则时间轮调度器自动退出
executedTaskCount++;
if (executedTaskCount == totalTaskCount){
this.stop();
}
}
// 清空当前槽位的任务
slot.clearTasks();
// 移动到下一个刻度
currentTick = (currentTick + 1) % wheelSize;
}
public void addTask(Runnable task, long delay) {
int ticks = (int) (delay / tickDuration);
int slotIndex = (currentTick + ticks) % wheelSize;
slots[slotIndex].addTask(task);
}
public void stop() {
scheduler.shutdown();
}
}
2.3使用样例
// 省略前面业务代码
boolean isEnableSharding = task.getEnablSharding();
if (isEnableSharding){
// 处理开启了分片策略的任务
List<List<NeInfo>> subNeInfoLists = new ArrayList<>();
// 每次执行任务的个数,用户由前端页面设定
int batchSize = task.getShardingThreshold();
// 总任务个数
int totalSize = neinfos.Size();
for(int i = 0; i < totalSizel; i+=batchSize){
int endIndex = Math.min(i + batchSize, totalSize);
subNeInfoList.add(neinfos.subList(i, endIndex));
}
int wheelSize = 0;
if (totalSize % batchSize == 0){
wheelSize = totalSize / batchSize;
}else{
wheelSize = totalSize / batchSize + 1;
}
// 初始化时间轮,tickDuration为页面设定的指定时间间隔(单位是秒,所以这里需要转为毫秒)
TimeWheel timeWheel = new TimeWheel(totalSize, task.getShardingTimeInterval*1000, wheelSize);
for (int i = 0; i < subNeInfoLists.size(); i++){
for(NeInfo neInfo : subNeInfoLists.get(i)){
timeWheel.addTask(() -> {
// 具体每个设备需要执行的任务
bsService.doTask(neInfo);
}, task.getShardingTimeInterval() * 1000 * i);
}
}
}