侧边栏壁纸
博主头像
colo

欲买桂花同载酒

  • 累计撰写 1824 篇文章
  • 累计收到 0 条评论

设计一个并发安全的优先级任务调度器

2025-12-11 / 0 评论 / 7 阅读

题目

设计一个并发安全的优先级任务调度器

信息

  • 类型:问答
  • 难度:⭐⭐⭐

考点

Channel高级应用,Select语句,并发控制,优先级调度,资源管理

快速回答

实现一个并发安全的优先级任务调度器需要解决以下核心问题:

  • 使用带缓冲的channel接收不同优先级的任务
  • 通过select实现优先级调度(高优先级任务优先执行)
  • 使用goroutine池控制最大并发数
  • 实现任务超时和取消机制
  • 处理调度器优雅关闭
## 解析

原理说明

优先级调度器的核心挑战在于:当多个优先级的任务同时到达时,确保高优先级任务优先执行。Go的select语句在多个channel就绪时随机选择,无法直接满足优先级需求。解决方案是:

  1. 为每个优先级创建独立channel(高/中/低)
  2. 使用优先级检查循环替代多case select
  3. 通过带缓冲channel实现并发控制(信号量模式)
  4. 结合context实现超时和取消

代码实现

package main

import (
    "context"
    "fmt"
    "sync"
    "time"
)

type Priority int

const (
    High Priority = iota
    Medium
    Low
)

type Task struct {
    ID       int
    Priority Priority
    Payload  func()
}

type Scheduler struct {
    highChan   chan Task
    mediumChan chan Task
    lowChan    chan Task
    semaphore  chan struct{} // 并发控制信号量
    ctx        context.Context
    cancel     context.CancelFunc
    wg         sync.WaitGroup
}

func NewScheduler(concurrency int) *Scheduler {
    ctx, cancel := context.WithCancel(context.Background())
    return &Scheduler{
        highChan:   make(chan Task, 100),
        mediumChan: make(chan Task, 100),
        lowChan:    make(chan Task, 100),
        semaphore:  make(chan struct{}, concurrency),
        ctx:        ctx,
        cancel:     cancel,
    }
}

func (s *Scheduler) Start() {
    s.wg.Add(1)
    go func() {
        defer s.wg.Done()
        for {
            // 优先级检查循环(高→中→低)
            select {
            case task := <-s.highChan:
                s.processTask(task)
                continue // 优先处理高优先级
            default:
            }

            select {
            case task := <-s.highChan:
                s.processTask(task)
            case task := <-s.mediumChan:
                s.processTask(task)
            case task := <-s.lowChan:
                s.processTask(task)
            case <-s.ctx.Done():
                return
            }
        }
    }()
}

func (s *Scheduler) processTask(task Task) {
    select {
    case s.semaphore <- struct{}{}: // 获取信号量
        s.wg.Add(1)
        go func() {
            defer func() {
                <-s.semaphore // 释放信号量
                s.wg.Done()
            }()

            // 带超时执行
            ctx, cancel := context.WithTimeout(s.ctx, 5*time.Second)
            defer cancel()

            done := make(chan struct{})
            go func() {
                task.Payload()
                close(done)
            }()

            select {
            case <-done:
            case <-ctx.Done():
                fmt.Printf("Task %d timed out\n", task.ID)
            }
        }()
    case <-s.ctx.Done():
        return
    }
}

func (s *Scheduler) Submit(task Task) error {
    select {
    case <-s.ctx.Done():
        return fmt.Errorf("scheduler stopped")
    default:
    }

    switch task.Priority {
    case High:
        s.highChan <- task
    case Medium:
        s.mediumChan <- task
    case Low:
        s.lowChan <- task
    }
    return nil
}

func (s *Scheduler) Stop() {
    s.cancel()     // 通知所有goroutine停止
    s.wg.Wait()    // 等待所有任务完成
    close(s.highChan)
    close(s.mediumChan)
    close(s.lowChan)
}

// 使用示例
func main() {
    scheduler := NewScheduler(3) // 最大并发数3
    scheduler.Start()

    // 提交混合优先级任务
    for i := 0; i < 10; i++ {
        priority := Priority(i % 3)
        task := Task{
            ID:       i,
            Priority: priority,
            Payload: func(id int) func() {
                return func() {
                    time.Sleep(1 * time.Second)
                    fmt.Printf("Executing task %d (Priority: %d)\n", id, priority)
                }
            }(i),
        }
        scheduler.Submit(task)
    }

    time.Sleep(3 * time.Second)
    scheduler.Stop()
}

最佳实践

  • 优先级实现:使用两级select,先检查高优先级channel,再检查所有channel
  • 并发控制:带缓冲channel作为计数信号量(semaphore模式)
  • 优雅停止:context通知停止 + WaitGroup等待任务完成
  • 资源隔离:不同优先级使用独立channel避免饥饿
  • 超时处理:context.WithTimeout确保任务不会永久阻塞

常见错误

  • 优先级反转:直接使用多case select导致随机执行(Go运行时随机选择就绪channel)
  • goroutine泄露:未正确处理Stop导致goroutine泄漏
  • 任务饥饿:高优先级任务持续涌入导致低优先级永远得不到执行(需设置优先级配额)
  • 并发失控:未限制最大goroutine数量导致资源耗尽
  • 通道阻塞:未处理channel满时的提交策略(如本例直接阻塞等待)

扩展知识

  • 动态优先级调整:可通过反馈机制动态调整任务优先级
  • 任务抢占:Go不支持强制抢占,可通过context取消低优先级任务
  • 公平调度:在优先级循环中加入权重或轮转机制防止饥饿
  • 批量处理:使用chan []Task提高吞吐量
  • 分布式扩展:结合消息队列(如NSQ/Kafka)实现跨进程调度