题目
设计支持依赖管理、超时控制、错误隔离和资源限制的协程任务调度器
信息
- 类型:问答
- 难度:⭐⭐⭐
考点
结构化并发,协程上下文管理,异常传播机制,资源限制策略,复杂状态处理
快速回答
实现要点:
- 使用
CoroutineScope+SupervisorJob实现错误隔离 - 通过
async/await处理任务依赖关系 - 采用
withTimeoutOrNull实现超时控制 - 使用
Semaphore进行并发资源限制 - 结合
CoroutineExceptionHandler定制错误处理 - 通过
select实现优先任务抢占
核心设计原理
在复杂并发场景中需协调多个关键机制:1) 结构化并发确保资源不泄露 2) 协程的父子取消传播 3) SupervisorJob实现的错误隔离 4) 协程上下文的组合与继承。任务调度器需要统一管理这些机制。
完整实现方案
class TaskScheduler(
private val maxConcurrency: Int
) {
private val scope = CoroutineScope(
SupervisorJob() +
Dispatchers.Default +
CoroutineExceptionHandler { _, e -> logError(e) }
)
private val semaphore = Semaphore(maxConcurrency)
// 任务优先级枚举
enum class Priority { HIGH, NORMAL }
suspend fun <T> scheduleTask(
task: suspend () -> T,
dependencies: List<Deferred<*>> = emptyList(),
timeout: Long? = null,
priority: Priority = Priority.NORMAL
): Deferred<T?> = scope.async {
// 1. 等待前置依赖完成
dependencies.forEach { it.await() }
// 2. 资源获取(带优先级抢占)
if (priority == Priority.HIGH) {
select<Unit> {
semaphore.acquire().onAwait { }
onTimeout(50) { throw TimeoutCancellationException("Priority resource timeout") }
}
} else {
semaphore.acquire()
}
try {
// 3. 执行带超时控制的任务
timeout?.let {
withTimeoutOrNull(it) { task() }
} ?: task()
} catch (e: Exception) {
when (e) {
is CancellationException -> println("Task canceled: ${e.message}")
else -> throw e // 异常会被CoroutineExceptionHandler捕获
}
null
} finally {
semaphore.release() // 确保资源释放
}
}
fun shutdown() = scope.cancel("Scheduler shutdown")
}关键机制解析
- 依赖管理:通过
dependencies.await()挂起保证前置任务完成 - 错误隔离:
SupervisorJob防止单个任务失败影响全局 - 资源限制:
Semaphore控制最大并发数,finally块保证释放 - 优先级抢占:
select + onTimeout实现高优先级任务快速失败 - 超时控制:
withTimeoutOrNull返回null不抛异常避免取消父协程
最佳实践
- 始终在
finally中释放资源(如数据库连接) - 对CPU密集型任务使用
Dispatchers.Default,IO任务用Dispatchers.IO - 通过
coroutineContext[Job]检查取消状态执行清理操作 - 使用
-Dkotlinx.coroutines.debug启用协程调试
常见错误
| 错误类型 | 后果 | 解决方案 |
|---|---|---|
| 未使用SupervisorJob | 子协程异常导致整个scope取消 | 显式声明错误隔离策略 |
| 在finally中阻塞 | 延迟资源释放引发死锁 | 用withContext(NonCancellable)包装清理代码 |
| 忽略协程取消 | 资源泄露或僵尸任务 | 定期检查isActive状态 |
扩展知识
- 结构化并发进阶:使用
coroutineScope{}创建子域自动传播取消 - 状态管理:通过
Mutex保护共享状态,避免用@Volatile - 性能优化:使用
-Xjvm-default=all提升挂起函数性能 - 监控集成:通过
CoroutineScopeMonitor跟踪任务生命周期