题目
设计高并发任务调度器:支持优先级与超时控制
信息
- 类型:问答
- 难度:⭐⭐⭐
考点
线程池定制, 优先级队列, 任务超时处理, 资源管理
快速回答
实现要点:
- 使用
PriorityBlockingQueue实现优先级任务队列 - 自定义
ThreadPoolExecutor并重写beforeExecute/afterExecute - 通过
Future.get(timeout)实现任务超时控制 - 采用
ThreadFactory定制线程命名和守护属性 - 使用
RejectedExecutionHandler处理拒绝策略
问题场景
在电商大促场景中,需要处理不同优先级的订单(VIP订单优先处理),同时确保单个任务执行不超过5秒,避免线程阻塞。
核心实现方案
1. 优先级任务队列
// 定义优先级任务
class PriorityTask implements Runnable, Comparable<PriorityTask> {
private final Runnable task;
private final int priority; // 1-10, 10最高
public PriorityTask(Runnable task, int priority) {
this.task = task;
this.priority = priority;
}
@Override
public void run() { task.run(); }
@Override
public int compareTo(PriorityTask other) {
return Integer.compare(other.priority, this.priority); // 降序排列
}
}
// 创建优先级队列
BlockingQueue<Runnable> queue = new PriorityBlockingQueue<>();2. 定制线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(
4, // 核心线程数
10, // 最大线程数
60, TimeUnit.SECONDS, // 空闲线程存活时间
queue, // 优先级队列
new NamedThreadFactory("OrderProcessor"), // 定制线程
new CallerRunsPolicy() // 拒绝策略
) {
// 超时控制
@Override
protected void beforeExecute(Thread t, Runnable r) {
if (r instanceof Future<?>) {
((Future<?>) r).get(5, TimeUnit.SECONDS);
}
}
// 异常处理
@Override
protected void afterExecute(Runnable r, Throwable t) {
if (t != null) {
System.err.println("Task failed: " + t.getMessage());
}
}
};
// 提交任务
executor.submit(new PriorityTask(() -> processOrder(order), 8));3. 超时控制实现
Future<?> future = executor.submit(task);
try {
future.get(5, TimeUnit.SECONDS); // 阻塞等待超时
} catch (TimeoutException e) {
future.cancel(true); // 中断任务
System.err.println("Task timeout, interrupted");
}最佳实践
- 资源隔离:为不同优先级任务创建独立线程池
- 监控:通过
ThreadPoolExecutor的getActiveCount()等方法实时监控 - 优雅关闭:使用
executor.shutdownNow()+awaitTermination - 上下文传递:通过
InheritableThreadLocal传递请求上下文
常见错误
- 优先级反转:低优先级任务持有高优先级任务所需资源
- 线程泄漏:未正确处理任务异常导致线程终止
- 饥饿现象:高优先级任务过多导致低优先级任务无法执行
- 错误超时处理:未调用
future.cancel(true)导致线程无法回收
扩展知识
- 动态调参:通过
setCorePoolSize()动态调整线程数 - 工作窃取:使用
ForkJoinPool提升并行效率 - 异步编排:结合
CompletableFuture实现任务链 - 性能优化:监控队列堆积情况(
queue.size())及时扩容