题目
设计高并发任务处理器:实现带流量控制和错误恢复的异步任务队列
信息
- 类型:问答
- 难度:⭐⭐⭐
考点
Future高级用法, Stream控制流, Isolate并发处理, 错误恢复机制, 背压(Backpressure)管理
快速回答
实现要点:
- 使用
StreamController创建任务队列和完成通知流 - 通过
Completer和Future.wait实现并发控制 - 利用
Isolate.run执行CPU密集型任务 - 错误处理:
- 任务重试机制(指数退避)
- 全局错误捕获
- 隔离区错误传递
- 背压管理:
- 动态调整并发度
- 队列积压预警
问题场景
在需要处理10,000+个耗时任务的场景中(如批量图片处理、数据加密等),直接使用Future.wait会导致内存溢出。需要实现:
- 动态并发控制(最大并行任务数)
- 任务失败自动重试(带退避策略)
- 实时进度报告
- 资源隔离(CPU密集型任务)
- 背压处理(队列积压预警)
核心实现方案
class TaskProcessor {
final int maxConcurrency;
final _controller = StreamController<Task>();
final _completers = <Completer>[];
int _activeCount = 0;
int _completed = 0;
TaskProcessor({this.maxConcurrency = 4});
// 添加任务到队列
void addTask(Task task) {
_controller.add(task);
if (_controller.stream.length > maxConcurrency * 2) {
print('警告:任务积压超过阈值!');
}
}
// 启动处理器
Future<void> run() async {
await for (final task in _controller.stream) {
// 背压控制:当活跃任务达到上限时等待
while (_activeCount >= maxConcurrency) {
await Future.delayed(Duration(milliseconds: 100));
}
_activeCount++;
final completer = Completer();
_completers.add(completer);
// 使用Isolate执行CPU密集型任务
unawaited(_executeInIsolate(task, completer));
}
}
Future<void> _executeInIsolate(Task task, Completer completer) async {
int retryCount = 0;
const maxRetries = 3;
Future<void> runTask() async {
try {
// 使用Isolate.run避免阻塞主事件循环
await Isolate.run(() => task.process());
completer.complete();
} catch (e, st) {
if (retryCount < maxRetries) {
retryCount++;
final delay = Duration(seconds: 1 << retryCount); // 指数退避
await Future.delayed(delay);
await runTask(); // 递归重试
} else {
completer.completeError(TaskFailure(task.id, e, st));
}
} finally {
_activeCount--;
_completed++;
print('进度: ${_completed}/${_controller.stream.length}');
}
}
await runTask();
}
// 等待所有任务完成
Future<void> waitDone() => Future.wait(_completers.map((c) => c.future));
}
// 使用示例
void main() async {
final processor = TaskProcessor(maxConcurrency: 5);
// 添加10000个任务
for (int i = 0; i < 10000; i++) {
processor.addTask(ImageCompressTask(i));
}
// 启动处理并等待
unawaited(processor.run());
await processor.waitDone();
}关键设计原理
- 并发控制:通过
_activeCount计数器限制并行任务数,避免资源耗尽 - 背压管理:当队列长度超过
maxConcurrency*2时发出警告,动态调整生产速度 - 错误恢复:
- 指数退避重试:1s/4s/8s延迟
- 隔离区错误捕获:通过
Isolate.run捕获异常并传回主Isolate - 错误封装:使用
TaskFailure保留原始错误栈
- 资源隔离:CPU密集型任务在独立Isolate中执行,不阻塞事件循环
最佳实践
- 进度反馈:通过
Stream提供实时进度通知(示例中简化为print) - 优雅关闭:添加
stop()方法关闭StreamController并终止进行中任务 - 内存优化:定期清理已完成任务的Completer引用
- 动态调参:根据系统负载自动调整
maxConcurrency
常见错误
- 内存泄漏:未清理
_completers列表导致OOM - 僵尸Isolate:未处理Isolate异常导致进程挂起
- 背压失控:生产者速度持续超过消费能力导致崩溃
- 错误吞噬:未正确传播隔离区异常
扩展知识
- Worker Pool模式:预创建Isolate池减少启动开销
- Stream背压策略:
StreamController(onListen)控制数据推送速率- 使用
StreamTransformer实现更复杂的流量控制
- 替代方案:
package:pool提供资源池实现package:stream_channel用于跨Isolate通信