侧边栏壁纸
博主头像
colo

欲买桂花同载酒

  • 累计撰写 1823 篇文章
  • 累计收到 0 条评论

设计高并发可扩展的线程池任务调度系统

2025-12-12 / 0 评论 / 4 阅读

题目

设计高并发可扩展的线程池任务调度系统

信息

  • 类型:问答
  • 难度:⭐⭐⭐

考点

线程池定制,资源限制,优先级调度,超时控制,异常处理

快速回答

核心实现要点:

  • 使用ThreadPoolExecutor定制核心参数和阻塞队列
  • 通过PriorityBlockingQueue实现任务优先级调度
  • 结合Semaphore实现系统级资源限制
  • 使用Future和超时机制控制任务执行时间
  • 重写afterExecute处理异常和资源释放
## 解析

问题场景

在分布式系统中需要处理大量并发任务,要求:1)支持高中低三级优先级;2)单个任务最大执行时间10秒;3)系统总CPU使用率不超过80%;4)任务失败可追溯。

核心实现方案

1. 定制优先级线程池

public class PriorityThreadPool extends ThreadPoolExecutor {
    public PriorityThreadPool(int coreSize, int maxSize) {
        super(coreSize, maxSize, 60, TimeUnit.SECONDS,
            new PriorityBlockingQueue<>(1000));
    }

    // 包装任务为优先级任务
    public Future<?> submit(Runnable task, int priority) {
        return super.submit(new PriorityTask(task, priority));
    }

    private static class PriorityTask implements Runnable, Comparable<PriorityTask> {
        private final Runnable task;
        private final int priority;

        public PriorityTask(Runnable task, int priority) {
            this.task = task;
            this.priority = priority;
        }

        @Override
        public int compareTo(PriorityTask o) {
            return Integer.compare(o.priority, this.priority); // 降序排列
        }

        @Override
        public void run() {
            task.run();
        }
    }
}

2. 资源限制与超时控制

// 全局资源控制器
public class ResourceLimiter {
    private final Semaphore cpuSemaphore = new Semaphore(Runtime.getRuntime().availableProcessors() * 8 / 10);

    public <T> T executeWithLimit(Callable<T> task) throws Exception {
        if (!cpuSemaphore.tryAcquire(2, TimeUnit.SECONDS)) {
            throw new ResourceLimitException("CPU资源不足");
        }
        try {
            Future<T> future = threadPool.submit(task, Priority.NORMAL);
            return future.get(10, TimeUnit.SECONDS); // 超时控制
        } finally {
            cpuSemaphore.release();
        }
    }
}

3. 异常处理与监控

@Override
protected void afterExecute(Runnable r, Throwable t) {
    super.afterExecute(r, t);
    if (t == null && r instanceof Future<?>) {
        try {
            Future<?> future = (Future<?>) r;
            future.get();
        } catch (CancellationException ce) {
            t = ce;
        } catch (ExecutionException ee) {
            t = ee.getCause();
        } catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
        }
    }
    if (t != null) {
        // 记录异常日志并告警
        monitor.recordFailure(t);
    }
}

最佳实践

  • 队列选择PriorityBlockingQueue需设置初始容量避免OOM
  • 拒绝策略:自定义策略记录任务并触发降级处理
  • 资源隔离:关键任务使用独立线程池避免级联故障
  • 监控指标:采集活跃线程数/队列大小/拒绝次数等核心指标

常见错误

  • 优先级反转:低优先级任务持有高优先级任务所需资源
  • 资源泄漏:未在finally块中释放Semaphore许可
  • 线程饥饿:高优先级任务过多导致低优先级永远得不到执行
  • 上下文切换:线程数超过CPU核心数导致性能下降

扩展知识

  • 动态调参:通过JMX运行时调整corePoolSize/maxPoolSize
  • 工作窃取ForkJoinPool适用于计算密集型任务
  • 异步编排:CompletableFuture实现复杂任务依赖链
  • 容器适配:在K8s环境中需结合cgroup限制资源