侧边栏壁纸
博主头像
colo

欲买桂花同载酒

  • 累计撰写 1823 篇文章
  • 累计收到 0 条评论

设计一个支持任务优先级的多线程文件下载器

2025-12-13 / 0 评论 / 4 阅读

题目

设计一个支持任务优先级的多线程文件下载器

信息

  • 类型:问答
  • 难度:⭐⭐

考点

线程池配置,任务优先级处理,线程间协调,异常处理,资源管理

快速回答

实现一个支持任务优先级的多线程下载器需要:

  • 使用ThreadPoolExecutor配合PriorityBlockingQueue实现优先级队列
  • 自定义Runnable实现Comparable接口定义优先级规则
  • 使用CountDownLatch确保所有下载完成后再合并文件
  • 正确处理线程中断和IO异常
  • 实现文件分块下载和合并逻辑
## 解析

原理说明

多线程下载器通过将大文件分割成多个块并行下载来提高速度。优先级队列允许高优先级任务(如VIP用户请求)优先执行。核心组件:

  1. 优先级线程池:使用PriorityBlockingQueue作为工作队列
  2. 任务分片:根据文件大小和线程数计算下载区间
  3. 任务协调CountDownLatch同步下载完成事件
  4. 错误处理:捕获下载异常并中断相关任务

代码示例

import java.util.concurrent.*;
import java.io.*;
import java.net.*;

// 优先级任务实现
class DownloadTask implements Runnable, Comparable<DownloadTask> {
    private final int priority;
    private final String url;
    private final File outputFile;
    private final long start;
    private final long end;

    public DownloadTask(int priority, String url, File outputFile, long start, long end) {
        this.priority = priority;
        this.url = url;
        this.outputFile = outputFile;
        this.start = start;
        this.end = end;
    }

    @Override
    public void run() {
        try {
            HttpURLConnection conn = (HttpURLConnection) new URL(url).openConnection();
            conn.setRequestProperty("Range", "bytes=" + start + "-" + end);
            try (InputStream in = conn.getInputStream();
                 RandomAccessFile raf = new RandomAccessFile(outputFile, "rw")) {
                raf.seek(start);
                byte[] buffer = new byte[8192];
                int bytesRead;
                while ((bytesRead = in.read(buffer)) != -1) {
                    if (Thread.currentThread().isInterrupted()) {
                        System.out.println("任务被中断: " + priority);
                        return;
                    }
                    raf.write(buffer, 0, bytesRead);
                }
            }
        } catch (IOException e) {
            Thread.currentThread().interrupt(); // 传递中断状态
            System.err.println("下载失败: " + e.getMessage());
        }
    }

    @Override
    public int compareTo(DownloadTask other) {
        return Integer.compare(other.priority, this.priority); // 降序排列
    }
}

// 下载管理器
public class PriorityDownloader {
    private final ThreadPoolExecutor executor;

    public PriorityDownloader(int poolSize) {
        executor = new ThreadPoolExecutor(
            poolSize, 
            poolSize,
            0L, 
            TimeUnit.MILLISECONDS,
            new PriorityBlockingQueue<>()
        );
    }

    public void downloadFile(String url, File outputFile, int priority) throws Exception {
        long fileSize = getRemoteFileSize(url);
        int chunks = executor.getMaximumPoolSize();
        long chunkSize = fileSize / chunks;
        CountDownLatch latch = new CountDownLatch(chunks);

        try (RandomAccessFile file = new RandomAccessFile(outputFile, "rw")) {
            file.setLength(fileSize); // 预分配空间
        }

        for (int i = 0; i < chunks; i++) {
            long start = i * chunkSize;
            long end = (i == chunks - 1) ? fileSize - 1 : start + chunkSize - 1;
            executor.execute(() -> {
                new DownloadTask(priority, url, outputFile, start, end).run();
                latch.countDown();
            });
        }

        latch.await(); // 等待所有分块完成
        System.out.println("文件下载完成: " + outputFile.getName());
    }

    private long getRemoteFileSize(String url) throws IOException {
        HttpURLConnection conn = (HttpURLConnection) new URL(url).openConnection();
        conn.setRequestMethod("HEAD");
        return conn.getContentLengthLong();
    }

    public void shutdown() {
        executor.shutdownNow();
    }

    // 使用示例
    public static void main(String[] args) throws Exception {
        PriorityDownloader downloader = new PriorityDownloader(4);
        downloader.downloadFile("http://example.com/large.zip", new File("large.zip"), 5);
        downloader.downloadFile("http://example.com/urgent.doc", new File("urgent.doc"), 9); // 更高优先级
        downloader.shutdown();
    }
}

最佳实践

  • 线程池配置:核心/最大线程数设为CPU核心数×2,避免过多线程导致上下文切换开销
  • 资源清理:在finally块中关闭所有IO资源
  • 中断处理:定期检查Thread.interrupted(),及时响应取消请求
  • 队列选择:小任务用PriorityBlockingQueue,大任务用LinkedBlockingQueue
  • 流量控制:添加限流机制(如Semaphore)防止带宽过载

常见错误

  • 优先级反转:低优先级任务持有高优先级任务所需资源,可通过锁分离解决
  • 线程饥饿:持续提交高优先级任务导致低优先级任务无法执行,需设置最大等待时间
  • 分片错误:未处理文件大小不能被线程数整除的情况,最后一个分片需特殊处理
  • 资源泄漏:未关闭HttpURLConnection或文件句柄,使用try-with-resources避免
  • 状态同步:多线程写入同一文件未同步,应使用RandomAccessFile分区间写入

扩展知识

  • 动态优先级调整:根据任务等待时间自动提升优先级(避免饥饿)
  • 断点续传:记录已下载分片位置,重启时恢复下载
  • 混合队列PriorityBlockingQueueLinkedBlockingQueue结合处理不同规模任务
  • 性能监控:通过ThreadPoolExecutor的钩子方法记录任务执行时间
  • 替代方案:考虑ForkJoinPool用于递归任务,或CompletableFuture编排复杂依赖