题目
并发任务执行器设计与实现
信息
- 类型:问答
- 难度:⭐⭐
考点
并发控制,context使用,错误处理,channel同步
快速回答
实现一个并发任务执行器需关注以下要点:
- 使用
sync.WaitGroup等待所有 goroutine 完成 - 通过带缓冲的 channel 控制并发数量
- 利用
context.Context实现超时/取消控制 - 采用线程安全的
sync.Mutex或 channel 收集结果 - 正确处理任务执行中的 panic 和错误
问题场景
在分布式任务调度、批量数据处理等场景中,需要安全高效地并发执行多个任务,同时满足:1) 控制最大并发数 2) 支持超时取消 3) 收集所有结果 4) 避免资源泄露。
核心实现方案
type Task func(context.Context) error
type Executor struct {
maxWorkers int
timeout time.Duration
}
func (e *Executor) Run(ctx context.Context, tasks []Task) []error {
ctx, cancel := context.WithTimeout(ctx, e.timeout)
defer cancel()
resultCh := make(chan error, len(tasks))
sem := make(chan struct{}, e.maxWorkers) // 信号量控制并发
var wg sync.WaitGroup
for _, task := range tasks {
wg.Add(1)
go func(t Task) {
defer wg.Done()
sem <- struct{}{} // 获取令牌
defer func() { <-sem }() // 释放令牌
defer func() {
if r := recover(); r != nil {
resultCh <- fmt.Errorf("panic: %v", r)
}
}()
select {
case <-ctx.Done():
resultCh <- ctx.Err()
default:
resultCh <- t(ctx)
}
}(task)
}
go func() {
wg.Wait() // 等待所有任务
close(resultCh) // 关闭结果通道
}()
results := make([]error, 0, len(tasks))
for err := range resultCh {
results = append(results, err)
}
return results
}关键原理说明
- 并发控制:通过带缓冲的 channel (
sem) 实现信号量模式,限制同时运行的 goroutine 数量 - 超时控制:
context.WithTimeout创建子上下文,通过select监听取消信号 - 结果收集:使用缓冲 channel 按完成顺序收集结果,避免锁竞争
- Panic 处理:在 defer 中 recover 防止单个任务 panic 导致整个程序崩溃
最佳实践
- 使用
context.Context传递取消信号而非直接关闭 channel - 任务函数设计为
func(context.Context) error格式以支持取消 - 通过
sync.WaitGroup确保所有资源回收后再关闭结果 channel - 结果 channel 带缓冲避免 goroutine 泄露
常见错误
- 未处理 panic 导致整个程序退出
- 忘记释放信号量造成死锁
- 在任务中阻塞未监听
ctx.Done() - 过早关闭结果 channel 引发 panic
- 未考虑任务执行时间超过总超时时间的情况
扩展知识
- 错误聚合:使用
errgroup.Group简化错误收集 - 权重控制:扩展信号量实现带权重的并发控制
- 结果排序:为任务添加 ID 标识实现结果排序
- 优雅退出:结合
os.Signal实现系统信号监听中断 - 性能优化:复用 goroutine 的 worker pool 模式减少调度开销