题目
设计支持任务取消和超时的异步任务调度器
信息
- 类型:问答
- 难度:⭐⭐⭐
考点
Future取消机制, 超时处理, 资源生命周期管理, 并发任务调度, Pin与自引用结构
快速回答
实现一个异步任务调度器需要解决以下核心问题:
- 使用
tokio::sync::Mutex保护任务状态 - 通过
tokio::task::AbortHandle实现任务取消 - 结合
tokio::time::timeout处理任务超时 - 使用
Pin<Box<dyn Future>>存储异构任务 - 设计
Drop实现确保资源清理
问题场景
在分布式系统中,经常需要管理大量异步任务的执行,要求:1) 支持主动取消任务 2) 自动处理执行超时 3) 避免资源泄漏。本设计需要处理任务取消与超时的竞态条件,并保证自引用结构的内存安全。
核心实现方案
use tokio::{sync::Mutex, time::{timeout, Duration}};
use std::{collections::HashMap, sync::Arc, pin::Pin, future::Future};
struct AsyncScheduler {
tasks: Mutex<HashMap<TaskId, AbortHandle>>,
}
impl AsyncScheduler {
async fn add_task<F, T>(&self, id: TaskId, task: F, max_duration: Duration) -> Result<T, TaskError>
where
F: Future<Output = T> + Send + 'static,
T: Send + 'static,
{
let (task, abort_handle) = tokio::task::spawn(task).abort_handle();
// 存储取消句柄
self.tasks.lock().await.insert(id, abort_handle);
// 执行带超时的任务
match timeout(max_duration, task).await {
Ok(Ok(res)) => Ok(res),
Ok(Err(_)) => Err(TaskError::Cancelled), // 任务被取消
Err(_) => {
self.cancel(id).await; // 超时后主动取消
Err(TaskError::Timeout)
}
}
}
async fn cancel(&self, id: TaskId) {
if let Some(handle) = self.tasks.lock().await.remove(&id) {
handle.abort();
}
}
}关键难点解析
1. Future取消机制
使用AbortHandle.abort()触发取消时:
- 任务线程收到取消信号后立即终止
- 正在执行的
.await点会返回Poll::Ready(Err(Cancelled)) - 必须处理
JoinError::is_cancelled()判断取消状态
2. 超时与取消的竞态处理
// 正确处理双重取消
match timeout(dura, task).await {
Err(_) => { /* 超时处理 */ },
Ok(Err(join_err)) if join_err.is_cancelled() => { /* 手动取消 */ },
// ...
}3. 自引用结构的内存安全
当任务持有自身引用时:
struct SelfReferentialTask {
data: String,
pointer: *const String, // 指向data的指针
}
impl Future for SelfReferentialTask {
// 必须使用Pin保证移动时不失效
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
// 访问pointer需要unsafe
unsafe { /* 使用NonNull优化 */ }
}
}最佳实践
- 资源清理:在
Drop实现中自动取消所有任务 - 错误处理:区分超时错误(
TaskError::Timeout)和主动取消(TaskError::Cancelled) - 内存安全:对自引用结构使用
Pin<Box<T>>固定内存位置 - 并发控制:使用
tokio::sync::Semaphore限制最大并发数
常见错误
- 取消后未清理:忘记从
HashMap移除已完成任务导致内存泄漏 - 双重取消:超时和手动取消同时触发时未正确处理
- 移动自引用结构:未使用
Pin导致悬垂指针 - 阻塞运行时:在异步任务中调用同步阻塞代码
扩展知识
- 结构化并发:使用
tokio-scoped或async-task-group管理任务生命周期 - 取消安全:实现
Drop时确保资源释放(如关闭网络连接) - 性能优化:使用
ArcSwap替代Mutex减少锁争用 - 跨线程安全:通过
Send + Sync约束确保调度器线程安全