侧边栏壁纸
博主头像
colo

欲买桂花同载酒

  • 累计撰写 1823 篇文章
  • 累计收到 0 条评论

使用线程池执行Callable任务并处理结果

2025-12-5 / 0 评论 / 5 阅读

题目

使用线程池执行Callable任务并处理结果

信息

  • 类型:问答
  • 难度:⭐⭐

考点

线程池配置,Callable与Future使用,异常处理,资源管理

快速回答

关键实现步骤:

  1. 使用ThreadPoolExecutor创建可配置的线程池
  2. 提交Callable任务获取Future对象
  3. 通过Future.get()获取结果并处理异常
  4. 使用shutdown()awaitTermination()优雅关闭线程池
  5. 使用ThreadLocal维护任务状态避免并发问题
## 解析

问题场景

需要并发处理多个耗时计算任务(如数据校验、文件解析等),要求:
1. 控制最大并发线程数
2. 获取每个任务的返回结果
3. 正确处理任务执行异常
4. 实现资源优雅释放

核心实现代码

import java.util.concurrent.*;
import java.util.*;

class TaskScheduler {
    private static final int CORE_POOL_SIZE = 5;
    private static final int MAX_POOL_SIZE = 10;
    private static final long KEEP_ALIVE_TIME = 30L;

    public Map<String, Integer> executeTasks(List<CallableTask> tasks) {
        // 1. 创建线程池
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            CORE_POOL_SIZE, 
            MAX_POOL_SIZE,
            KEEP_ALIVE_TIME,
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(100)
        );

        Map<String, Future<Integer>> futures = new HashMap<>();
        Map<String, Integer> results = new ConcurrentHashMap<>();

        try {
            // 2. 提交所有任务
            for (CallableTask task : tasks) {
                futures.put(task.getId(), executor.submit(task));
            }

            // 3. 获取任务结果
            for (Map.Entry<String, Future<Integer>> entry : futures.entrySet()) {
                try {
                    results.put(entry.getKey(), entry.getValue().get());
                } catch (ExecutionException e) {
                    // 处理任务内部异常
                    System.err.println("Task failed: " + entry.getKey() 
                        + ", Cause: " + e.getCause().getMessage());
                    results.put(entry.getKey(), -1); // 错误标识
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt(); // 恢复中断状态
                    System.err.println("Interrupted while waiting");
                }
            }
        } finally {
            // 4. 优雅关闭线程池
            executor.shutdown();
            try {
                if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
                    executor.shutdownNow();
                }
            } catch (InterruptedException e) {
                executor.shutdownNow();
                Thread.currentThread().interrupt();
            }
        }
        return results;
    }
}

// 任务定义(使用ThreadLocal维护状态)
class CallableTask implements Callable<Integer> {
    private final String id;
    private final ThreadLocal<Integer> progress = new ThreadLocal<>();

    public CallableTask(String id) { this.id = id; }
    public String getId() { return id; }

    @Override
    public Integer call() throws Exception {
        progress.set(0);
        // 模拟计算过程
        for (int i = 1; i <= 10; i++) {
            TimeUnit.MILLISECONDS.sleep(100);
            progress.set(i * 10); // 更新进度
            if (i == 5 && id.equals("task2")) 
                throw new IllegalStateException("Simulated error");
        }
        return new Random().nextInt(100);
    }
}

关键原理说明

  • 线程池配置:通过ThreadPoolExecutor显式设置核心/最大线程数、存活时间和工作队列,避免使用Executors工厂方法可能导致的OOM风险
  • Future机制Future.get()阻塞获取结果,需处理ExecutionException(封装任务异常)和InterruptedException
  • 资源释放shutdown()停止接收新任务,awaitTermination()等待已有任务完成,超时后调用shutdownNow()强制终止
  • 线程安全:使用ConcurrentHashMap存储结果,ThreadLocal维护任务私有状态

最佳实践

  1. 队列选择:根据场景选用队列类型
    - LinkedBlockingQueue:无界队列(需防OOM)
    - ArrayBlockingQueue:有界队列(需处理拒绝策略)
  2. 拒绝策略:通过RejectedExecutionHandler处理任务满的情况
    - 默认AbortPolicy抛出异常
    - CallerRunsPolicy由提交线程执行任务
  3. 异常处理
    - 在任务内捕获所有异常并返回错误码
    - 或重写ThreadPoolExecutor.afterExecute()处理未捕获异常

常见错误

  • ❌ 使用Executors.newFixedThreadPool()创建无界队列导致OOM
  • ❌ 忽略Future.get()的异常处理导致主线程崩溃
  • ❌ 未调用shutdown()导致JVM无法退出
  • ❌ 在任务中使用共享变量未同步

扩展知识

  • CompletableFuture:Java8+提供更强大的异步编程能力,支持链式调用和组合操作
  • ForkJoinPool:适用于分治算法,工作窃取(work-stealing)机制提升CPU利用率
  • 线程池监控:通过ThreadPoolExecutorgetActiveCount()getQueue().size()等方法实现运行时监控