题目
线程池实现多线程计算平方和及异常处理
信息
- 类型:问答
- 难度:⭐⭐
考点
线程池配置,Callable与Future使用,任务拆分策略,多线程异常处理,资源管理
快速回答
实现要点:
- 使用
ThreadPoolExecutor自定义线程池,避免Executors的潜在问题 - 通过
Callable定义计算任务,返回Future获取结果 - 将大任务拆分为多个子任务(如按区间拆分)
- 使用
Future.get()捕获ExecutionException处理计算异常 - 确保
finally块中关闭线程池
问题场景
计算1~n的平方和(sum = 1² + 2² + ... + n²),当n较大时(如1亿),单线程计算效率低。要求使用线程池实现多线程计算,并处理计算过程中可能出现的异常(如数值溢出)。
核心实现方案
import java.util.concurrent.*;
import java.util.ArrayList;
import java.util.List;
public class ParallelSumCalculator {
// 自定义线程池配置
private static final int CORE_POOL_SIZE = Runtime.getRuntime().availableProcessors();
private static final int MAX_POOL_SIZE = CORE_POOL_SIZE * 2;
private static final long KEEP_ALIVE_TIME = 60L;
private static final BlockingQueue<Runnable> WORK_QUEUE = new LinkedBlockingQueue<>(100);
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
CORE_POOL_SIZE,
MAX_POOL_SIZE,
KEEP_ALIVE_TIME,
TimeUnit.SECONDS,
WORK_QUEUE,
new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
);
long n = 100_000_000;
int taskCount = CORE_POOL_SIZE; // 任务数=CPU核心数
long segment = n / taskCount;
List<Future<Long>> futures = new ArrayList<>();
try {
// 任务拆分与提交
for (int i = 0; i < taskCount; i++) {
long start = i * segment + 1;
long end = (i == taskCount - 1) ? n : (i + 1) * segment;
futures.add(executor.submit(new SquareSumTask(start, end)));
}
// 结果合并
long totalSum = 0;
for (Future<Long> future : futures) {
try {
totalSum += future.get(); // 阻塞获取结果
} catch (ExecutionException e) {
// 处理任务执行异常
Throwable cause = e.getCause();
if (cause instanceof ArithmeticException) {
System.err.println("计算错误: " + cause.getMessage());
}
// 其他异常处理逻辑
}
}
System.out.println("Total sum: " + totalSum);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
executor.shutdown(); // 关闭线程池
}
}
// Callable任务定义
static class SquareSumTask implements Callable<Long> {
private final long start;
private final long end;
public SquareSumTask(long start, long end) {
this.start = start;
this.end = end;
}
@Override
public Long call() {
long sum = 0;
for (long i = start; i <= end; i++) {
// 显式检查溢出
if (i > Integer.MAX_VALUE) {
throw new ArithmeticException("数值溢出: " + i);
}
sum += i * i;
}
return sum;
}
}
}关键原理说明
- 线程池配置:根据CPU核心数动态设置线程池大小,使用有界队列防止OOM,设置
CallerRunsPolicy拒绝策略保证任务不丢失 - 任务拆分:将大区间平均拆分为多个子区间(任务数=CPU核心数),避免线程过多导致上下文切换开销
- 异常传递:
Callable.call()抛出的异常会被包装在ExecutionException中,通过Future.get()捕获处理 - 资源管理:
finally块确保线程池关闭,防止线程泄漏
最佳实践
- 线程池选择:始终使用
ThreadPoolExecutor而非Executors,避免FixedThreadPool/CachedThreadPool的OOM风险 - 任务粒度:子任务大小应足够大(至少10,000次计算),避免任务调度开销成为瓶颈
- 异常处理:在任务内部检查临界条件(如数值溢出),而非依赖外部校验
- 结果合并:使用
CompletableFuture可以更优雅地处理结果聚合(Java 8+)
常见错误
- 线程池配置不当:使用无界队列导致OOM,或线程数过多降低性能
- 未处理中断:忽略
InterruptedException导致线程无法及时响应中断 - 异常吞没:未处理
Future.get()抛出的ExecutionException,使主线程无法感知任务失败 - 资源泄漏:忘记关闭线程池,导致JVM无法退出
扩展知识
- Fork/Join框架:对于递归型任务(如归并排序),使用
ForkJoinPool效率更高 - 工作窃取(Work Stealing):
ForkJoinPool的特性,空闲线程从其他线程队列尾部窃取任务 - 异步编程:
CompletableFuture支持链式调用和组合操作,简化多线程协作 - 性能监控:通过
ThreadPoolExecutor的getCompletedTaskCount()等方法实现运行时监控