题目
如何用Redis实现可靠的分布式锁?
信息
- 类型:问答
- 难度:⭐⭐
考点
分布式锁原理,Redis原子操作,锁续期机制,高可用设计
快速回答
实现Redis分布式锁的核心要点:
- 使用
SET key value NX PX timeout原子命令创建锁 - 设置唯一客户端标识(UUID)作为value,确保只能由加锁者解锁
- 引入锁续期机制(看门狗)处理业务超时
- 通过Lua脚本保证解锁操作的原子性
- 考虑Redis集群故障转移场景下的安全性
一、核心实现原理
分布式锁需满足:互斥性(同一时刻仅一个客户端持有锁)、安全性(仅加锁者可解锁)、容错性(Redis节点故障时仍可用)。Redis通过单线程特性+原子操作实现互斥性。
二、基础实现代码示例(Python)
import redis
import uuid
class RedisLock:
def __init__(self, redis_client, lock_key, expire_time=30000):
self.redis = redis_client
self.lock_key = lock_key
self.expire_time = expire_time # 毫秒
self.identifier = str(uuid.uuid4()) # 唯一标识
def acquire(self):
# 原子操作:SET if Not eXists + 过期时间
result = self.redis.set(
self.lock_key,
self.identifier,
nx=True,
px=self.expire_time
)
return result is True
def release(self):
# 使用Lua脚本保证原子性校验+删除
script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
return self.redis.eval(script, 1, self.lock_key, self.identifier) == 1三、关键问题解决方案
1. 锁续期(看门狗机制)
当业务执行时间超过锁过期时间时,需自动续期:
def start_watchdog(self):
def renew():
while self.lock_held:
# 每10秒续期一次(过期时间的1/3)
self.redis.pexpire(self.lock_key, self.expire_time)
time.sleep(self.expire_time / 3000) # 毫秒转秒
threading.Thread(target=renew, daemon=True).start()2. 集群故障转移问题
Redis主从切换可能导致锁丢失(Redlock算法解决):
- 向N个独立Redis节点顺序请求锁
- 当获得超过半数的锁且总耗时小于锁有效期时成功
- 客户端计算锁获取时间需小于锁有效时间
四、最佳实践
- 超时设置:过期时间 = 业务最大预估时间 + 缓冲时间(如1.5倍)
- 重试策略:采用指数退避重试(Exponential Backoff)避免集群雪崩
- 锁命名规范:
lock:{业务}:{资源ID}(如lock:order:pay:12345) - 监控:通过Redis的
INFO stats监控锁竞争情况
五、常见错误
| 错误类型 | 后果 | 正确方案 |
|---|---|---|
| 未设置唯一标识 | 其他客户端可能误删锁 | 使用UUID作为value |
| 未用原子操作解锁 | 并发场景可能误删他人锁 | 使用Lua脚本保证校验+删除原子性 |
| 忽略业务超时 | 锁提前释放导致数据不一致 | 实现看门狗续期机制 |
| 单节点部署 | 主从切换后锁失效 | 关键业务使用Redlock算法 |
六、扩展知识
- Redisson框架:Java客户端提供自动续期、可重入锁等高级特性
- 锁粒度优化:细粒度锁(如按用户ID分段)提升并发性能
- 替代方案对比:Zookeeper(CP系统)适合强一致性场景,Redis(AP系统)性能更高
- Redis 7.0新特性:
SET命令支持PXAT(毫秒级时间戳过期)更精准控制