题目
设计一个支持任务依赖、超时控制、错误传播和资源限制的协程任务处理器
信息
- 类型:问答
- 难度:⭐⭐⭐
考点
结构化并发, 协程上下文管理, 异常传播机制, 资源限制策略, 复杂状态管理
快速回答
实现要点:
- 使用
CoroutineScope实现结构化并发生命周期管理 - 通过
SupervisorJob和自定义CoroutineExceptionHandler处理错误 - 使用
Semaphore或withContext(Dispatchers.IO.limitedParallelism(N))限制并发资源 - 利用
async/await和Deferred处理任务依赖关系 - 通过
withTimeout和withTimeoutOrNull实现超时控制 - 使用
Mutex保护共享状态
问题场景
在复杂系统中需要处理:1) 任务间依赖关系 2) 单个任务超时控制 3) 错误传播与恢复 4) 系统资源限制(如最大并发数)。要求设计一个协程任务处理器,满足:
- 任务可声明依赖的其他任务
- 单个任务超时自动取消且不影响其他任务
- 错误精确传播到调用方
- 控制最大并发协程数量
核心实现方案
class TaskProcessor(maxConcurrency: Int) {
private val scope = CoroutineScope(
SupervisorJob() +
Dispatchers.Default +
CoroutineExceptionHandler { _, e ->
/* 集中处理未捕获异常 */ }
)
// 资源限制方案1:使用Semaphore
private val semaphore = Semaphore(maxConcurrency)
// 资源限制方案2(Kotlin 1.6+)
private val limitedDispatcher = Dispatchers.IO.limitedParallelism(maxConcurrency)
// 任务状态跟踪
private val taskRegistry = mutableMapOf<String, Deferred<*>>()
private val mutex = Mutex()
suspend fun submitTask(
taskId: String,
dependsOn: List<String> = emptyList(),
block: suspend () -> Unit
): Deferred<Unit> = coroutineScope {
// 检查依赖是否存在
val dependencies = dependsOn.map { id ->
mutex.withLock { taskRegistry[id] ?: throw IllegalArgumentException("Missing dependency: $id") }
}
// 等待前置任务完成
dependencies.forEach { it.await() }
// 方案1:使用Semaphore限制资源
val task = scope.async {
semaphore.withPermit {
withTimeout(5000) { // 单个任务超时控制
block()
}
}
}
// 方案2:使用定制Dispatcher
/*
val task = scope.async(limitedDispatcher) {
withTimeout(5000) {
block()
}
}
*/
mutex.withLock { taskRegistry[taskId] = task }
task
}
fun shutdown() {
scope.cancel("Processor shutdown")
}
}关键设计原理
- 结构化并发:通过自定义CoroutineScope管理所有任务生命周期,shutdown()调用可取消所有任务
- 错误隔离:SupervisorJob确保单个任务失败不会影响其他任务
- 依赖处理:通过Deferred.await()实现任务依赖链,前置任务失败会传播异常
- 资源限制:Semaphore或limitedParallelism控制最大并发协程数
- 线程安全:Mutex保护共享的taskRegistry状态
最佳实践
- 使用
SupervisorJob而非常规Job实现错误隔离 - 为耗时任务指定
withTimeout,避免无限阻塞 - 通过
CoroutineExceptionHandler集中处理未捕获异常 - 使用
limitedParallelism(Kotlin 1.6+)实现更高效的资源限制 - 暴露
shutdown()方法实现优雅停止
常见错误
- 错误:使用普通Job导致一个任务失败取消所有任务
修复:改用SupervisorJob - 错误:未处理依赖任务抛出的异常
修复:在dependencies.await()外添加异常处理 - 错误:在Semaphore中未使用
withPermit导致许可证泄露
修复:确保在finally块释放资源 - 错误:未保护共享状态taskRegistry
修复:使用Mutex或@Volatile保证可见性
扩展知识
- 任务优先级:可结合
Semaphore和优先级队列实现 - 任务重试:使用
retry库或自定义重试逻辑 - 状态持久化:通过
Flow暴露任务状态流 - 取消传播:在任务块中定期检查
isActive或调用ensureActive() - 监控集成:通过
CoroutineContext注入监控指标