题目
设计一个支持任务优先级的多线程文件下载器
信息
- 类型:问答
- 难度:⭐⭐
考点
线程池配置,任务优先级处理,线程间协调,异常处理,资源管理
快速回答
实现一个支持任务优先级的多线程下载器需要:
- 使用
ThreadPoolExecutor配合PriorityBlockingQueue实现优先级队列 - 自定义
Runnable实现Comparable接口定义优先级规则 - 使用
CountDownLatch确保所有下载完成后再合并文件 - 正确处理线程中断和IO异常
- 实现文件分块下载和合并逻辑
原理说明
多线程下载器通过将大文件分割成多个块并行下载来提高速度。优先级队列允许高优先级任务(如VIP用户请求)优先执行。核心组件:
- 优先级线程池:使用
PriorityBlockingQueue作为工作队列 - 任务分片:根据文件大小和线程数计算下载区间
- 任务协调:
CountDownLatch同步下载完成事件 - 错误处理:捕获下载异常并中断相关任务
代码示例
import java.util.concurrent.*;
import java.io.*;
import java.net.*;
// 优先级任务实现
class DownloadTask implements Runnable, Comparable<DownloadTask> {
private final int priority;
private final String url;
private final File outputFile;
private final long start;
private final long end;
public DownloadTask(int priority, String url, File outputFile, long start, long end) {
this.priority = priority;
this.url = url;
this.outputFile = outputFile;
this.start = start;
this.end = end;
}
@Override
public void run() {
try {
HttpURLConnection conn = (HttpURLConnection) new URL(url).openConnection();
conn.setRequestProperty("Range", "bytes=" + start + "-" + end);
try (InputStream in = conn.getInputStream();
RandomAccessFile raf = new RandomAccessFile(outputFile, "rw")) {
raf.seek(start);
byte[] buffer = new byte[8192];
int bytesRead;
while ((bytesRead = in.read(buffer)) != -1) {
if (Thread.currentThread().isInterrupted()) {
System.out.println("任务被中断: " + priority);
return;
}
raf.write(buffer, 0, bytesRead);
}
}
} catch (IOException e) {
Thread.currentThread().interrupt(); // 传递中断状态
System.err.println("下载失败: " + e.getMessage());
}
}
@Override
public int compareTo(DownloadTask other) {
return Integer.compare(other.priority, this.priority); // 降序排列
}
}
// 下载管理器
public class PriorityDownloader {
private final ThreadPoolExecutor executor;
public PriorityDownloader(int poolSize) {
executor = new ThreadPoolExecutor(
poolSize,
poolSize,
0L,
TimeUnit.MILLISECONDS,
new PriorityBlockingQueue<>()
);
}
public void downloadFile(String url, File outputFile, int priority) throws Exception {
long fileSize = getRemoteFileSize(url);
int chunks = executor.getMaximumPoolSize();
long chunkSize = fileSize / chunks;
CountDownLatch latch = new CountDownLatch(chunks);
try (RandomAccessFile file = new RandomAccessFile(outputFile, "rw")) {
file.setLength(fileSize); // 预分配空间
}
for (int i = 0; i < chunks; i++) {
long start = i * chunkSize;
long end = (i == chunks - 1) ? fileSize - 1 : start + chunkSize - 1;
executor.execute(() -> {
new DownloadTask(priority, url, outputFile, start, end).run();
latch.countDown();
});
}
latch.await(); // 等待所有分块完成
System.out.println("文件下载完成: " + outputFile.getName());
}
private long getRemoteFileSize(String url) throws IOException {
HttpURLConnection conn = (HttpURLConnection) new URL(url).openConnection();
conn.setRequestMethod("HEAD");
return conn.getContentLengthLong();
}
public void shutdown() {
executor.shutdownNow();
}
// 使用示例
public static void main(String[] args) throws Exception {
PriorityDownloader downloader = new PriorityDownloader(4);
downloader.downloadFile("http://example.com/large.zip", new File("large.zip"), 5);
downloader.downloadFile("http://example.com/urgent.doc", new File("urgent.doc"), 9); // 更高优先级
downloader.shutdown();
}
}最佳实践
- 线程池配置:核心/最大线程数设为CPU核心数×2,避免过多线程导致上下文切换开销
- 资源清理:在
finally块中关闭所有IO资源 - 中断处理:定期检查
Thread.interrupted(),及时响应取消请求 - 队列选择:小任务用
PriorityBlockingQueue,大任务用LinkedBlockingQueue - 流量控制:添加限流机制(如Semaphore)防止带宽过载
常见错误
- 优先级反转:低优先级任务持有高优先级任务所需资源,可通过锁分离解决
- 线程饥饿:持续提交高优先级任务导致低优先级任务无法执行,需设置最大等待时间
- 分片错误:未处理文件大小不能被线程数整除的情况,最后一个分片需特殊处理
- 资源泄漏:未关闭
HttpURLConnection或文件句柄,使用try-with-resources避免 - 状态同步:多线程写入同一文件未同步,应使用
RandomAccessFile分区间写入
扩展知识
- 动态优先级调整:根据任务等待时间自动提升优先级(避免饥饿)
- 断点续传:记录已下载分片位置,重启时恢复下载
- 混合队列:
PriorityBlockingQueue与LinkedBlockingQueue结合处理不同规模任务 - 性能监控:通过
ThreadPoolExecutor的钩子方法记录任务执行时间 - 替代方案:考虑
ForkJoinPool用于递归任务,或CompletableFuture编排复杂依赖