题目
设计一个并发安全的优先级任务调度器
信息
- 类型:问答
- 难度:⭐⭐⭐
考点
Channel高级应用,Select语句,并发控制,优先级调度,资源管理
快速回答
实现一个并发安全的优先级任务调度器需要解决以下核心问题:
- 使用带缓冲的channel接收不同优先级的任务
- 通过select实现优先级调度(高优先级任务优先执行)
- 使用goroutine池控制最大并发数
- 实现任务超时和取消机制
- 处理调度器优雅关闭
原理说明
优先级调度器的核心挑战在于:当多个优先级的任务同时到达时,确保高优先级任务优先执行。Go的select语句在多个channel就绪时随机选择,无法直接满足优先级需求。解决方案是:
- 为每个优先级创建独立channel(高/中/低)
- 使用优先级检查循环替代多case select
- 通过带缓冲channel实现并发控制(信号量模式)
- 结合context实现超时和取消
代码实现
package main
import (
"context"
"fmt"
"sync"
"time"
)
type Priority int
const (
High Priority = iota
Medium
Low
)
type Task struct {
ID int
Priority Priority
Payload func()
}
type Scheduler struct {
highChan chan Task
mediumChan chan Task
lowChan chan Task
semaphore chan struct{} // 并发控制信号量
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
}
func NewScheduler(concurrency int) *Scheduler {
ctx, cancel := context.WithCancel(context.Background())
return &Scheduler{
highChan: make(chan Task, 100),
mediumChan: make(chan Task, 100),
lowChan: make(chan Task, 100),
semaphore: make(chan struct{}, concurrency),
ctx: ctx,
cancel: cancel,
}
}
func (s *Scheduler) Start() {
s.wg.Add(1)
go func() {
defer s.wg.Done()
for {
// 优先级检查循环(高→中→低)
select {
case task := <-s.highChan:
s.processTask(task)
continue // 优先处理高优先级
default:
}
select {
case task := <-s.highChan:
s.processTask(task)
case task := <-s.mediumChan:
s.processTask(task)
case task := <-s.lowChan:
s.processTask(task)
case <-s.ctx.Done():
return
}
}
}()
}
func (s *Scheduler) processTask(task Task) {
select {
case s.semaphore <- struct{}{}: // 获取信号量
s.wg.Add(1)
go func() {
defer func() {
<-s.semaphore // 释放信号量
s.wg.Done()
}()
// 带超时执行
ctx, cancel := context.WithTimeout(s.ctx, 5*time.Second)
defer cancel()
done := make(chan struct{})
go func() {
task.Payload()
close(done)
}()
select {
case <-done:
case <-ctx.Done():
fmt.Printf("Task %d timed out\n", task.ID)
}
}()
case <-s.ctx.Done():
return
}
}
func (s *Scheduler) Submit(task Task) error {
select {
case <-s.ctx.Done():
return fmt.Errorf("scheduler stopped")
default:
}
switch task.Priority {
case High:
s.highChan <- task
case Medium:
s.mediumChan <- task
case Low:
s.lowChan <- task
}
return nil
}
func (s *Scheduler) Stop() {
s.cancel() // 通知所有goroutine停止
s.wg.Wait() // 等待所有任务完成
close(s.highChan)
close(s.mediumChan)
close(s.lowChan)
}
// 使用示例
func main() {
scheduler := NewScheduler(3) // 最大并发数3
scheduler.Start()
// 提交混合优先级任务
for i := 0; i < 10; i++ {
priority := Priority(i % 3)
task := Task{
ID: i,
Priority: priority,
Payload: func(id int) func() {
return func() {
time.Sleep(1 * time.Second)
fmt.Printf("Executing task %d (Priority: %d)\n", id, priority)
}
}(i),
}
scheduler.Submit(task)
}
time.Sleep(3 * time.Second)
scheduler.Stop()
}
最佳实践
- 优先级实现:使用两级select,先检查高优先级channel,再检查所有channel
- 并发控制:带缓冲channel作为计数信号量(semaphore模式)
- 优雅停止:context通知停止 + WaitGroup等待任务完成
- 资源隔离:不同优先级使用独立channel避免饥饿
- 超时处理:context.WithTimeout确保任务不会永久阻塞
常见错误
- 优先级反转:直接使用多case select导致随机执行(Go运行时随机选择就绪channel)
- goroutine泄露:未正确处理Stop导致goroutine泄漏
- 任务饥饿:高优先级任务持续涌入导致低优先级永远得不到执行(需设置优先级配额)
- 并发失控:未限制最大goroutine数量导致资源耗尽
- 通道阻塞:未处理channel满时的提交策略(如本例直接阻塞等待)
扩展知识
- 动态优先级调整:可通过反馈机制动态调整任务优先级
- 任务抢占:Go不支持强制抢占,可通过context取消低优先级任务
- 公平调度:在优先级循环中加入权重或轮转机制防止饥饿
- 批量处理:使用
chan []Task提高吞吐量 - 分布式扩展:结合消息队列(如NSQ/Kafka)实现跨进程调度