侧边栏壁纸
博主头像
colo

欲买桂花同载酒

  • 累计撰写 1823 篇文章
  • 累计收到 0 条评论

设计高并发任务处理器:实现带流量控制和错误恢复的异步任务队列

2025-12-11 / 0 评论 / 5 阅读

题目

设计高并发任务处理器:实现带流量控制和错误恢复的异步任务队列

信息

  • 类型:问答
  • 难度:⭐⭐⭐

考点

Future高级用法, Stream控制流, Isolate并发处理, 错误恢复机制, 背压(Backpressure)管理

快速回答

实现要点:

  • 使用StreamController创建任务队列和完成通知流
  • 通过CompleterFuture.wait实现并发控制
  • 利用Isolate.run执行CPU密集型任务
  • 错误处理:
    • 任务重试机制(指数退避)
    • 全局错误捕获
    • 隔离区错误传递
  • 背压管理:
    • 动态调整并发度
    • 队列积压预警
## 解析

问题场景

在需要处理10,000+个耗时任务的场景中(如批量图片处理、数据加密等),直接使用Future.wait会导致内存溢出。需要实现:

  1. 动态并发控制(最大并行任务数)
  2. 任务失败自动重试(带退避策略)
  3. 实时进度报告
  4. 资源隔离(CPU密集型任务)
  5. 背压处理(队列积压预警)

核心实现方案

class TaskProcessor {
  final int maxConcurrency;
  final _controller = StreamController<Task>();
  final _completers = <Completer>[];
  int _activeCount = 0;
  int _completed = 0;

  TaskProcessor({this.maxConcurrency = 4});

  // 添加任务到队列
  void addTask(Task task) {
    _controller.add(task);
    if (_controller.stream.length > maxConcurrency * 2) {
      print('警告:任务积压超过阈值!');
    }
  }

  // 启动处理器
  Future<void> run() async {
    await for (final task in _controller.stream) {
      // 背压控制:当活跃任务达到上限时等待
      while (_activeCount >= maxConcurrency) {
        await Future.delayed(Duration(milliseconds: 100));
      }

      _activeCount++;
      final completer = Completer();
      _completers.add(completer);

      // 使用Isolate执行CPU密集型任务
      unawaited(_executeInIsolate(task, completer));
    }
  }

  Future<void> _executeInIsolate(Task task, Completer completer) async {
    int retryCount = 0;
    const maxRetries = 3;

    Future<void> runTask() async {
      try {
        // 使用Isolate.run避免阻塞主事件循环
        await Isolate.run(() => task.process());
        completer.complete();
      } catch (e, st) {
        if (retryCount < maxRetries) {
          retryCount++;
          final delay = Duration(seconds: 1 << retryCount); // 指数退避
          await Future.delayed(delay);
          await runTask(); // 递归重试
        } else {
          completer.completeError(TaskFailure(task.id, e, st));
        }
      } finally {
        _activeCount--;
        _completed++;
        print('进度: ${_completed}/${_controller.stream.length}');
      }
    }

    await runTask();
  }

  // 等待所有任务完成
  Future<void> waitDone() => Future.wait(_completers.map((c) => c.future));
}

// 使用示例
void main() async {
  final processor = TaskProcessor(maxConcurrency: 5);

  // 添加10000个任务
  for (int i = 0; i < 10000; i++) {
    processor.addTask(ImageCompressTask(i));
  }

  // 启动处理并等待
  unawaited(processor.run());
  await processor.waitDone();
}

关键设计原理

  • 并发控制:通过_activeCount计数器限制并行任务数,避免资源耗尽
  • 背压管理:当队列长度超过maxConcurrency*2时发出警告,动态调整生产速度
  • 错误恢复
    1. 指数退避重试:1s/4s/8s延迟
    2. 隔离区错误捕获:通过Isolate.run捕获异常并传回主Isolate
    3. 错误封装:使用TaskFailure保留原始错误栈
  • 资源隔离:CPU密集型任务在独立Isolate中执行,不阻塞事件循环

最佳实践

  • 进度反馈:通过Stream提供实时进度通知(示例中简化为print)
  • 优雅关闭:添加stop()方法关闭StreamController并终止进行中任务
  • 内存优化:定期清理已完成任务的Completer引用
  • 动态调参:根据系统负载自动调整maxConcurrency

常见错误

  • 内存泄漏:未清理_completers列表导致OOM
  • 僵尸Isolate:未处理Isolate异常导致进程挂起
  • 背压失控:生产者速度持续超过消费能力导致崩溃
  • 错误吞噬:未正确传播隔离区异常

扩展知识

  • Worker Pool模式:预创建Isolate池减少启动开销
  • Stream背压策略
    • StreamController(onListen)控制数据推送速率
    • 使用StreamTransformer实现更复杂的流量控制
  • 替代方案
    • package:pool提供资源池实现
    • package:stream_channel用于跨Isolate通信