题目
使用线程池执行Callable任务并处理结果
信息
- 类型:问答
- 难度:⭐⭐
考点
线程池配置,Callable与Future使用,异常处理,资源管理
快速回答
关键实现步骤:
- 使用
ThreadPoolExecutor创建可配置的线程池 - 提交
Callable任务获取Future对象 - 通过
Future.get()获取结果并处理异常 - 使用
shutdown()和awaitTermination()优雅关闭线程池 - 使用
ThreadLocal维护任务状态避免并发问题
问题场景
需要并发处理多个耗时计算任务(如数据校验、文件解析等),要求:
1. 控制最大并发线程数
2. 获取每个任务的返回结果
3. 正确处理任务执行异常
4. 实现资源优雅释放
核心实现代码
import java.util.concurrent.*;
import java.util.*;
class TaskScheduler {
private static final int CORE_POOL_SIZE = 5;
private static final int MAX_POOL_SIZE = 10;
private static final long KEEP_ALIVE_TIME = 30L;
public Map<String, Integer> executeTasks(List<CallableTask> tasks) {
// 1. 创建线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(
CORE_POOL_SIZE,
MAX_POOL_SIZE,
KEEP_ALIVE_TIME,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(100)
);
Map<String, Future<Integer>> futures = new HashMap<>();
Map<String, Integer> results = new ConcurrentHashMap<>();
try {
// 2. 提交所有任务
for (CallableTask task : tasks) {
futures.put(task.getId(), executor.submit(task));
}
// 3. 获取任务结果
for (Map.Entry<String, Future<Integer>> entry : futures.entrySet()) {
try {
results.put(entry.getKey(), entry.getValue().get());
} catch (ExecutionException e) {
// 处理任务内部异常
System.err.println("Task failed: " + entry.getKey()
+ ", Cause: " + e.getCause().getMessage());
results.put(entry.getKey(), -1); // 错误标识
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // 恢复中断状态
System.err.println("Interrupted while waiting");
}
}
} finally {
// 4. 优雅关闭线程池
executor.shutdown();
try {
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
return results;
}
}
// 任务定义(使用ThreadLocal维护状态)
class CallableTask implements Callable<Integer> {
private final String id;
private final ThreadLocal<Integer> progress = new ThreadLocal<>();
public CallableTask(String id) { this.id = id; }
public String getId() { return id; }
@Override
public Integer call() throws Exception {
progress.set(0);
// 模拟计算过程
for (int i = 1; i <= 10; i++) {
TimeUnit.MILLISECONDS.sleep(100);
progress.set(i * 10); // 更新进度
if (i == 5 && id.equals("task2"))
throw new IllegalStateException("Simulated error");
}
return new Random().nextInt(100);
}
}关键原理说明
- 线程池配置:通过
ThreadPoolExecutor显式设置核心/最大线程数、存活时间和工作队列,避免使用Executors工厂方法可能导致的OOM风险 - Future机制:
Future.get()阻塞获取结果,需处理ExecutionException(封装任务异常)和InterruptedException - 资源释放:
shutdown()停止接收新任务,awaitTermination()等待已有任务完成,超时后调用shutdownNow()强制终止 - 线程安全:使用
ConcurrentHashMap存储结果,ThreadLocal维护任务私有状态
最佳实践
- 队列选择:根据场景选用队列类型
-LinkedBlockingQueue:无界队列(需防OOM)
-ArrayBlockingQueue:有界队列(需处理拒绝策略) - 拒绝策略:通过
RejectedExecutionHandler处理任务满的情况
- 默认AbortPolicy抛出异常
-CallerRunsPolicy由提交线程执行任务 - 异常处理:
- 在任务内捕获所有异常并返回错误码
- 或重写ThreadPoolExecutor.afterExecute()处理未捕获异常
常见错误
- ❌ 使用
Executors.newFixedThreadPool()创建无界队列导致OOM - ❌ 忽略
Future.get()的异常处理导致主线程崩溃 - ❌ 未调用
shutdown()导致JVM无法退出 - ❌ 在任务中使用共享变量未同步
扩展知识
- CompletableFuture:Java8+提供更强大的异步编程能力,支持链式调用和组合操作
- ForkJoinPool:适用于分治算法,工作窃取(work-stealing)机制提升CPU利用率
- 线程池监控:通过
ThreadPoolExecutor的getActiveCount()、getQueue().size()等方法实现运行时监控