侧边栏壁纸
博主头像
colo

欲买桂花同载酒

  • 累计撰写 1823 篇文章
  • 累计收到 0 条评论

设计支持依赖管理、超时控制、错误隔离和资源限制的协程任务调度器

2025-12-11 / 0 评论 / 4 阅读

题目

设计支持依赖管理、超时控制、错误隔离和资源限制的协程任务调度器

信息

  • 类型:问答
  • 难度:⭐⭐⭐

考点

结构化并发,协程上下文管理,异常传播机制,资源限制策略,复杂状态处理

快速回答

实现要点:

  • 使用CoroutineScope+SupervisorJob实现错误隔离
  • 通过async/await处理任务依赖关系
  • 采用withTimeoutOrNull实现超时控制
  • 使用Semaphore进行并发资源限制
  • 结合CoroutineExceptionHandler定制错误处理
  • 通过select实现优先任务抢占
## 解析

核心设计原理

在复杂并发场景中需协调多个关键机制:1) 结构化并发确保资源不泄露 2) 协程的父子取消传播 3) SupervisorJob实现的错误隔离 4) 协程上下文的组合与继承。任务调度器需要统一管理这些机制。

完整实现方案

class TaskScheduler(  
    private val maxConcurrency: Int  
) {  
    private val scope = CoroutineScope(  
        SupervisorJob() +  
        Dispatchers.Default +  
        CoroutineExceptionHandler { _, e -> logError(e) }  
    )  
    private val semaphore = Semaphore(maxConcurrency)  

    // 任务优先级枚举  
    enum class Priority { HIGH, NORMAL }  

    suspend fun <T> scheduleTask(  
        task: suspend () -> T,  
        dependencies: List<Deferred<*>> = emptyList(),  
        timeout: Long? = null,  
        priority: Priority = Priority.NORMAL  
    ): Deferred<T?> = scope.async {  
        // 1. 等待前置依赖完成  
        dependencies.forEach { it.await() }  
        // 2. 资源获取(带优先级抢占)  
        if (priority == Priority.HIGH) {  
            select<Unit> {  
                semaphore.acquire().onAwait { }  
                onTimeout(50) { throw TimeoutCancellationException("Priority resource timeout") }  
            }  
        } else {  
            semaphore.acquire()  
        }  
        try {  
            // 3. 执行带超时控制的任务  
            timeout?.let {  
                withTimeoutOrNull(it) { task() }  
            } ?: task()  
        } catch (e: Exception) {  
            when (e) {  
                is CancellationException -> println("Task canceled: ${e.message}")  
                else -> throw e  // 异常会被CoroutineExceptionHandler捕获  
            }  
            null  
        } finally {  
            semaphore.release()  // 确保资源释放  
        }  
    }  
    fun shutdown() = scope.cancel("Scheduler shutdown")  
}

关键机制解析

  • 依赖管理:通过dependencies.await()挂起保证前置任务完成
  • 错误隔离SupervisorJob防止单个任务失败影响全局
  • 资源限制Semaphore控制最大并发数,finally块保证释放
  • 优先级抢占select + onTimeout实现高优先级任务快速失败
  • 超时控制withTimeoutOrNull返回null不抛异常避免取消父协程

最佳实践

  1. 始终在finally中释放资源(如数据库连接)
  2. 对CPU密集型任务使用Dispatchers.Default,IO任务用Dispatchers.IO
  3. 通过coroutineContext[Job]检查取消状态执行清理操作
  4. 使用-Dkotlinx.coroutines.debug启用协程调试

常见错误

错误类型后果解决方案
未使用SupervisorJob子协程异常导致整个scope取消显式声明错误隔离策略
在finally中阻塞延迟资源释放引发死锁withContext(NonCancellable)包装清理代码
忽略协程取消资源泄露或僵尸任务定期检查isActive状态

扩展知识

  • 结构化并发进阶:使用coroutineScope{}创建子域自动传播取消
  • 状态管理:通过Mutex保护共享状态,避免用@Volatile
  • 性能优化:使用-Xjvm-default=all提升挂起函数性能
  • 监控集成:通过CoroutineScopeMonitor跟踪任务生命周期