侧边栏壁纸
博主头像
colo

欲买桂花同载酒

  • 累计撰写 1823 篇文章
  • 累计收到 0 条评论

设计一个并发安全的动态资源池,支持超时获取和优雅关闭

2025-12-12 / 0 评论 / 4 阅读

题目

设计一个并发安全的动态资源池,支持超时获取和优雅关闭

信息

  • 类型:问答
  • 难度:⭐⭐⭐

考点

并发控制,通道高级用法,context使用,资源生命周期管理,竞态条件处理

快速回答

实现要点:

  • 使用带缓冲通道管理资源
  • sync.Mutex保护共享状态
  • context处理超时和取消
  • sync.Once实现优雅关闭
  • 动态调整需原子操作
## 解析

问题场景

在高并发系统中,需要管理数据库连接等昂贵资源。要求实现:

  1. 并发安全的获取/释放资源
  2. 支持获取超时(context)
  3. 运行时动态调整池大小
  4. 优雅关闭(等待所有资源归还)

核心实现

type ResourcePool struct {
    resources chan Resource
    mu        sync.Mutex
    size      int32
    closed    bool
    closeOnce sync.Once
    closeChan chan struct{}
}

func NewPool(size int) *ResourcePool {
    p := &ResourcePool{
        resources: make(chan Resource, size),
        closeChan: make(chan struct{}),
    }
    atomic.StoreInt32(&p.size, int32(size))
    // 初始化资源...
    return p
}

// 获取资源(带超时)
func (p *ResourcePool) Get(ctx context.Context) (Resource, error) {
    select {
    case res := <-p.resources:
        return res, nil
    case <-ctx.Done():
        return nil, ctx.Err()
    default:
        // 动态创建新资源(需考虑上限)
        p.mu.Lock()
        currentSize := atomic.LoadInt32(&p.size)
        if len(p.resources) == 0 && cap(p.resources) < int(currentSize) {
            // 创建新资源
            newRes := createResource()
            p.mu.Unlock()
            return newRes, nil
        }
        p.mu.Unlock()

        // 等待现有资源
        select {
        case res := <-p.resources:
            return res, nil
        case <-ctx.Done():
            return nil, ctx.Err()
        }
    }
}

// 释放资源
func (p *ResourcePool) Put(res Resource) {
    p.mu.Lock()
    defer p.mu.Unlock()

    if p.closed {
        res.Close() // 立即销毁
        return
    }

    select {
    case p.resources <- res: // 放回池中
    default:
        res.Close() // 池已满则销毁
    }
}

// 动态调整大小
func (p *ResourcePool) Resize(newSize int) {
    p.mu.Lock()
    defer p.mu.Unlock()

    oldCap := cap(p.resources)
    if newSize == oldCap {
        return
    }

    newChan := make(chan Resource, newSize)
    close(p.closeChan) // 通知旧通道停止使用

    // 迁移现有资源
    for res := range p.resources {
        select {
        case newChan <- res:
        default:
            res.Close() // 新池满则销毁
        }
    }

    p.resources = newChan
    p.closeChan = make(chan struct{})
    atomic.StoreInt32(&p.size, int32(newSize))
}

// 优雅关闭
func (p *ResourcePool) Close() {
    p.closeOnce.Do(func() {
        p.mu.Lock()
        p.closed = true
        close(p.resources)
        for res := range p.resources {
            res.Close()
        }
        p.mu.Unlock()
    })
}

关键难点解析

  • 动态调整:通过原子操作保证size可见性,迁移时需双重检查状态
  • 超时控制:context与select配合实现多级超时
  • 关闭安全:sync.Once确保关闭只执行一次,关闭时处理残留资源
  • 资源竞争:Put/Get/Resize操作需用mutex保护共享状态

最佳实践

  1. 使用cap(channel)获取当前容量而非依赖size变量
  2. 资源创建应放在锁外避免阻塞全局操作
  3. 关闭时先标记状态,防止新请求进入
  4. 监控资源利用率指导动态调整

常见错误

错误后果解决方案
未处理创建资源失败资源泄漏加入重试机制和错误返回
调整大小时未同步状态数据竞争使用原子操作+互斥锁双重保护
关闭后Put导致panic程序崩溃关闭标记检查

扩展知识

  • 漏桶限流:结合time.Ticker实现QPS控制
  • 健康检查:goroutine定期验证资源有效性
  • 指标暴露:通过expvar监控池状态
  • 优先级获取:使用多个channel实现优先级队列