侧边栏壁纸
博主头像
colo

欲买桂花同载酒

  • 累计撰写 1823 篇文章
  • 累计收到 0 条评论

设计一个支持任务依赖、超时控制、错误传播和资源限制的协程任务处理器

2025-12-12 / 0 评论 / 4 阅读

题目

设计一个支持任务依赖、超时控制、错误传播和资源限制的协程任务处理器

信息

  • 类型:问答
  • 难度:⭐⭐⭐

考点

结构化并发, 协程上下文管理, 异常传播机制, 资源限制策略, 复杂状态管理

快速回答

实现要点:

  • 使用 CoroutineScope 实现结构化并发生命周期管理
  • 通过 SupervisorJob 和自定义 CoroutineExceptionHandler 处理错误
  • 使用 SemaphorewithContext(Dispatchers.IO.limitedParallelism(N)) 限制并发资源
  • 利用 async/awaitDeferred 处理任务依赖关系
  • 通过 withTimeoutwithTimeoutOrNull 实现超时控制
  • 使用 Mutex 保护共享状态
## 解析

问题场景

在复杂系统中需要处理:1) 任务间依赖关系 2) 单个任务超时控制 3) 错误传播与恢复 4) 系统资源限制(如最大并发数)。要求设计一个协程任务处理器,满足:

  • 任务可声明依赖的其他任务
  • 单个任务超时自动取消且不影响其他任务
  • 错误精确传播到调用方
  • 控制最大并发协程数量

核心实现方案

class TaskProcessor(maxConcurrency: Int) {
    private val scope = CoroutineScope(
        SupervisorJob() +
        Dispatchers.Default +
        CoroutineExceptionHandler { _, e -> 
            /* 集中处理未捕获异常 */ }
    )

    // 资源限制方案1:使用Semaphore
    private val semaphore = Semaphore(maxConcurrency)

    // 资源限制方案2(Kotlin 1.6+)
    private val limitedDispatcher = Dispatchers.IO.limitedParallelism(maxConcurrency)

    // 任务状态跟踪
    private val taskRegistry = mutableMapOf<String, Deferred<*>>()
    private val mutex = Mutex()

    suspend fun submitTask(
        taskId: String,
        dependsOn: List<String> = emptyList(),
        block: suspend () -> Unit
    ): Deferred<Unit> = coroutineScope {
        // 检查依赖是否存在
        val dependencies = dependsOn.map { id ->
            mutex.withLock { taskRegistry[id] ?: throw IllegalArgumentException("Missing dependency: $id") }
        }

        // 等待前置任务完成
        dependencies.forEach { it.await() }

        // 方案1:使用Semaphore限制资源
        val task = scope.async {
            semaphore.withPermit {
                withTimeout(5000) { // 单个任务超时控制
                    block()
                }
            }
        }

        // 方案2:使用定制Dispatcher
        /*
        val task = scope.async(limitedDispatcher) {
            withTimeout(5000) {
                block()
            }
        }
        */

        mutex.withLock { taskRegistry[taskId] = task }
        task
    }

    fun shutdown() {
        scope.cancel("Processor shutdown")
    }
}

关键设计原理

  • 结构化并发:通过自定义CoroutineScope管理所有任务生命周期,shutdown()调用可取消所有任务
  • 错误隔离:SupervisorJob确保单个任务失败不会影响其他任务
  • 依赖处理:通过Deferred.await()实现任务依赖链,前置任务失败会传播异常
  • 资源限制:Semaphore或limitedParallelism控制最大并发协程数
  • 线程安全:Mutex保护共享的taskRegistry状态

最佳实践

  • 使用SupervisorJob而非常规Job实现错误隔离
  • 为耗时任务指定withTimeout,避免无限阻塞
  • 通过CoroutineExceptionHandler集中处理未捕获异常
  • 使用limitedParallelism(Kotlin 1.6+)实现更高效的资源限制
  • 暴露shutdown()方法实现优雅停止

常见错误

  • 错误:使用普通Job导致一个任务失败取消所有任务
    修复:改用SupervisorJob
  • 错误:未处理依赖任务抛出的异常
    修复:在dependencies.await()外添加异常处理
  • 错误:在Semaphore中未使用withPermit导致许可证泄露
    修复:确保在finally块释放资源
  • 错误:未保护共享状态taskRegistry
    修复:使用Mutex@Volatile保证可见性

扩展知识

  • 任务优先级:可结合Semaphore和优先级队列实现
  • 任务重试:使用retry库或自定义重试逻辑
  • 状态持久化:通过Flow暴露任务状态流
  • 取消传播:在任务块中定期检查isActive或调用ensureActive()
  • 监控集成:通过CoroutineContext注入监控指标