侧边栏壁纸
博主头像
colo

欲买桂花同载酒

  • 累计撰写 1823 篇文章
  • 累计收到 0 条评论

实现一个支持并发任务的线程安全资源池

2025-12-8 / 0 评论 / 5 阅读

题目

实现一个支持并发任务的线程安全资源池

信息

  • 类型:问答
  • 难度:⭐⭐

考点

并发编程,上下文管理器,线程安全,资源管理

快速回答

实现线程安全的资源池需要:

  • 使用threading.Lock保证线程安全
  • 通过__enter____exit__实现上下文管理器
  • 使用队列管理资源
  • 处理资源获取超时和异常
## 解析

问题背景

在高并发场景中,频繁创建/销毁数据库连接、网络连接等资源会带来巨大开销。资源池通过复用资源提高性能,需满足:

  • 线程安全:多线程同时获取资源时不冲突
  • 阻塞控制:资源不足时合理等待
  • 异常处理:资源失效时自动替换

核心实现

import threading
import queue
import time

class ResourcePool:
    def __init__(self, create_resource, max_size=5, timeout=10):
        self._create_resource = create_resource
        self._pool = queue.Queue(max_size)
        self._lock = threading.Lock()
        self.timeout = timeout
        # 初始化资源
        for _ in range(max_size):
            self._pool.put(create_resource())

    def get_resource(self):
        """获取资源,支持超时"""
        try:
            # 非阻塞获取,避免永久等待
            return self._pool.get(block=True, timeout=self.timeout)
        except queue.Empty:
            raise TimeoutError("获取资源超时")

    def return_resource(self, resource):
        """归还资源"""
        with self._lock:  # 保证线程安全
            if self._pool.full():
                self._close_resource(resource)  # 池满则销毁
            else:
                self._pool.put(resource)

    def _close_resource(self, resource):
        """模拟资源关闭"""
        if hasattr(resource, 'close'):
            resource.close()

    def __enter__(self):
        """上下文管理器入口"""
        return self.get_resource()

    def __exit__(self, exc_type, exc_val, exc_tb):
        """上下文管理器出口"""
        if exc_type is None:
            self.return_resource(self._current_resource)
        else:
            self._close_resource(self._current_resource)  # 异常时销毁
        return False  # 不抑制异常

# 使用示例
if __name__ == "__main__":
    # 模拟数据库连接
    def create_db_conn():
        return {"id": id({}), "status": "active"}

    pool = ResourcePool(create_db_conn, max_size=2)

    # 正确使用方式(上下文管理器)
    with pool as conn:
        print(f"使用连接: {conn['id']}")
        # 模拟操作
        time.sleep(0.1)

    # 错误示例:忘记归还
    conn = pool.get_resource()
    print(f"获取连接: {conn['id']}")
    # 忘记调用 return_resource → 资源泄漏

关键设计点

  • 线程安全:使用threading.Lock保护共享队列
  • 阻塞控制queue.Queue内置阻塞机制,通过timeout避免死锁
  • 上下文管理器:通过__enter__/__exit__确保资源释放
  • 异常处理:操作中发生异常时,在__exit__中销毁资源

最佳实践

  1. 始终使用with语句确保资源归还
  2. 设置合理的池大小和超时时间
  3. 实现资源健康检查(扩展return_resource逻辑)
  4. 对于失效资源(如断开连接),在归还时主动销毁

常见错误

  • 资源泄漏:获取资源后未归还
  • 死锁:池大小不足且无超时机制
  • 线程冲突:未加锁导致状态不一致
  • 脏资源:归还前未重置资源状态

扩展知识

  • 连接池增强:添加ping()方法验证资源有效性
  • 异步支持:使用asyncio.Queue实现协程池
  • 对象生命周期:结合weakref跟踪资源引用
  • 标准库参考multiprocessing.pool.Pool的设计思想