侧边栏壁纸
博主头像
colo

欲买桂花同载酒

  • 累计撰写 1823 篇文章
  • 累计收到 0 条评论

并发任务执行器设计与实现

2025-12-12 / 0 评论 / 4 阅读

题目

并发任务执行器设计与实现

信息

  • 类型:问答
  • 难度:⭐⭐

考点

并发控制,context使用,错误处理,channel同步

快速回答

实现一个并发任务执行器需关注以下要点:

  • 使用 sync.WaitGroup 等待所有 goroutine 完成
  • 通过带缓冲的 channel 控制并发数量
  • 利用 context.Context 实现超时/取消控制
  • 采用线程安全的 sync.Mutex 或 channel 收集结果
  • 正确处理任务执行中的 panic 和错误
## 解析

问题场景

在分布式任务调度、批量数据处理等场景中,需要安全高效地并发执行多个任务,同时满足:1) 控制最大并发数 2) 支持超时取消 3) 收集所有结果 4) 避免资源泄露。

核心实现方案

type Task func(context.Context) error

type Executor struct {
    maxWorkers int
    timeout    time.Duration
}

func (e *Executor) Run(ctx context.Context, tasks []Task) []error {
    ctx, cancel := context.WithTimeout(ctx, e.timeout)
    defer cancel()

    resultCh := make(chan error, len(tasks))
    sem := make(chan struct{}, e.maxWorkers) // 信号量控制并发
    var wg sync.WaitGroup

    for _, task := range tasks {
        wg.Add(1)
        go func(t Task) {
            defer wg.Done()
            sem <- struct{}{}        // 获取令牌
            defer func() { <-sem }() // 释放令牌

            defer func() {
                if r := recover(); r != nil {
                    resultCh <- fmt.Errorf("panic: %v", r)
                }
            }()

            select {
            case <-ctx.Done():
                resultCh <- ctx.Err()
            default:
                resultCh <- t(ctx)
            }
        }(task)
    }

    go func() {
        wg.Wait()       // 等待所有任务
        close(resultCh) // 关闭结果通道
    }()

    results := make([]error, 0, len(tasks))
    for err := range resultCh {
        results = append(results, err)
    }
    return results
}

关键原理说明

  • 并发控制:通过带缓冲的 channel (sem) 实现信号量模式,限制同时运行的 goroutine 数量
  • 超时控制context.WithTimeout 创建子上下文,通过 select 监听取消信号
  • 结果收集:使用缓冲 channel 按完成顺序收集结果,避免锁竞争
  • Panic 处理:在 defer 中 recover 防止单个任务 panic 导致整个程序崩溃

最佳实践

  • 使用 context.Context 传递取消信号而非直接关闭 channel
  • 任务函数设计为 func(context.Context) error 格式以支持取消
  • 通过 sync.WaitGroup 确保所有资源回收后再关闭结果 channel
  • 结果 channel 带缓冲避免 goroutine 泄露

常见错误

  • 未处理 panic 导致整个程序退出
  • 忘记释放信号量造成死锁
  • 在任务中阻塞未监听 ctx.Done()
  • 过早关闭结果 channel 引发 panic
  • 未考虑任务执行时间超过总超时时间的情况

扩展知识

  • 错误聚合:使用 errgroup.Group 简化错误收集
  • 权重控制:扩展信号量实现带权重的并发控制
  • 结果排序:为任务添加 ID 标识实现结果排序
  • 优雅退出:结合 os.Signal 实现系统信号监听中断
  • 性能优化:复用 goroutine 的 worker pool 模式减少调度开销