题目
设计高并发可扩展的线程池任务调度系统
信息
- 类型:问答
- 难度:⭐⭐⭐
考点
线程池定制,资源限制,优先级调度,超时控制,异常处理
快速回答
核心实现要点:
- 使用
ThreadPoolExecutor定制核心参数和阻塞队列 - 通过
PriorityBlockingQueue实现任务优先级调度 - 结合
Semaphore实现系统级资源限制 - 使用
Future和超时机制控制任务执行时间 - 重写
afterExecute处理异常和资源释放
问题场景
在分布式系统中需要处理大量并发任务,要求:1)支持高中低三级优先级;2)单个任务最大执行时间10秒;3)系统总CPU使用率不超过80%;4)任务失败可追溯。
核心实现方案
1. 定制优先级线程池
public class PriorityThreadPool extends ThreadPoolExecutor {
public PriorityThreadPool(int coreSize, int maxSize) {
super(coreSize, maxSize, 60, TimeUnit.SECONDS,
new PriorityBlockingQueue<>(1000));
}
// 包装任务为优先级任务
public Future<?> submit(Runnable task, int priority) {
return super.submit(new PriorityTask(task, priority));
}
private static class PriorityTask implements Runnable, Comparable<PriorityTask> {
private final Runnable task;
private final int priority;
public PriorityTask(Runnable task, int priority) {
this.task = task;
this.priority = priority;
}
@Override
public int compareTo(PriorityTask o) {
return Integer.compare(o.priority, this.priority); // 降序排列
}
@Override
public void run() {
task.run();
}
}
}2. 资源限制与超时控制
// 全局资源控制器
public class ResourceLimiter {
private final Semaphore cpuSemaphore = new Semaphore(Runtime.getRuntime().availableProcessors() * 8 / 10);
public <T> T executeWithLimit(Callable<T> task) throws Exception {
if (!cpuSemaphore.tryAcquire(2, TimeUnit.SECONDS)) {
throw new ResourceLimitException("CPU资源不足");
}
try {
Future<T> future = threadPool.submit(task, Priority.NORMAL);
return future.get(10, TimeUnit.SECONDS); // 超时控制
} finally {
cpuSemaphore.release();
}
}
}3. 异常处理与监控
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
if (t == null && r instanceof Future<?>) {
try {
Future<?> future = (Future<?>) r;
future.get();
} catch (CancellationException ce) {
t = ce;
} catch (ExecutionException ee) {
t = ee.getCause();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
}
if (t != null) {
// 记录异常日志并告警
monitor.recordFailure(t);
}
}最佳实践
- 队列选择:
PriorityBlockingQueue需设置初始容量避免OOM - 拒绝策略:自定义策略记录任务并触发降级处理
- 资源隔离:关键任务使用独立线程池避免级联故障
- 监控指标:采集活跃线程数/队列大小/拒绝次数等核心指标
常见错误
- 优先级反转:低优先级任务持有高优先级任务所需资源
- 资源泄漏:未在finally块中释放Semaphore许可
- 线程饥饿:高优先级任务过多导致低优先级永远得不到执行
- 上下文切换:线程数超过CPU核心数导致性能下降
扩展知识
- 动态调参:通过JMX运行时调整corePoolSize/maxPoolSize
- 工作窃取:
ForkJoinPool适用于计算密集型任务 - 异步编排:CompletableFuture实现复杂任务依赖链
- 容器适配:在K8s环境中需结合cgroup限制资源