Redis 分布式锁实现笔记(Python)
1. 基本概念
• 分布式锁:在分布式系统中协调多个进程/服务对共享资源的访问 • Redis 锁特性: • 互斥性:同一时刻只有一个客户端能持有锁 • 避免死锁:锁必须有超时机制 • 容错性:Redis 节点宕机时仍能正常运作
2. 简单实现方案
2.1 基础实现代码
python
import redis
import time
import uuid
class RedisLock:
def __init__(self, redis_client, lock_name, expire_time=30):
"""
:param redis_client: Redis 客户端连接
:param lock_name: 锁名称
:param expire_time: 锁自动过期时间(秒)
"""
self.redis = redis_client
self.lock_name = f"lock:{lock_name}" # 添加前缀避免冲突
self.expire_time = expire_time
self.identifier = str(uuid.uuid4()) # 唯一标识符
def acquire(self, timeout=10):
"""
获取锁
:param timeout: 获取锁的超时时间(秒)
:return: bool 是否成功获取锁
"""
end = time.time() + timeout
while time.time() < end:
# SET key value NX EX seconds
if self.redis.set(
self.lock_name,
self.identifier,
nx=True,
ex=self.expire_time
):
return True
time.sleep(0.01) # 适当等待避免CPU空转
return False
def release(self):
"""
释放锁(使用Lua脚本保证原子性)
"""
lua_script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
self.redis.eval(lua_script, 1, self.lock_name, self.identifier)
# 实现上下文管理器协议 进入上下文时调用 (获取锁)
def __enter__(self):
if not self.acquire():
raise Exception(f"Failed to acquire lock {self.lock_name}")
return self
# 实现上下文管理器协议 退出上下文时调用 (释放锁)
def __exit__(self, exc_type, exc_val, exc_tb):
self.release()2.2 使用示例
python
# 创建Redis连接
r = redis.Redis(host='localhost', port=6379, db=0)
# 使用方式1:with语句(推荐)
with RedisLock(r, "order_123"):
# 执行需要加锁的业务逻辑
print("处理订单123...")
# 使用方式2:手动获取释放
lock = RedisLock(r, "inventory_456")
try:
if lock.acquire(timeout=5):
print("修改库存456...")
else:
print("获取锁超时")
finally:
lock.release()3. 高级实现方案
3.1 Redlock 算法(分布式环境)
python
from redlock import Redlock
# 初始化(连接多个Redis节点)
dlm = Redlock([
{"host": "redis1.example.com", "port": 6379},
{"host": "redis2.example.com", "port": 6379},
{"host": "redis3.example.com", "port": 6379},
])
# 获取锁(有效期10秒)
lock = dlm.lock("global_resource", 10000)
if lock:
try:
print("处理全局资源...")
finally:
dlm.unlock(lock)
else:
print("获取分布式锁失败")3.2 锁续期机制
python
class RedisLockWithRenewal(RedisLock):
def __init__(self, *args, renewal_interval=10, **kwargs):
super().__init__(*args, **kwargs)
self.renewal_interval = renewal_interval
self._stop_renewal = False
def _renew_lock(self):
"""后台线程续期锁"""
while not self._stop_renewal:
time.sleep(self.renewal_interval)
self.redis.expire(self.lock_name, self.expire_time)
def __enter__(self):
if not self.acquire():
raise Exception("Failed to acquire lock")
# 启动续期线程
self._stop_renewal = False
import threading
threading.Thread(
target=self._renew_lock,
daemon=True
).start()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self._stop_renewal = True
self.release()4. 关键注意事项
原子性操作: • 获取锁必须使用
SET key value NX EX seconds命令组合 • 释放锁必须使用 Lua 脚本保证原子性锁命名规范: • 使用业务相关的前缀(如
order_lock:{order_id}) • 避免使用通用名称导致冲突超时时间设置: • 过期时间应大于业务处理时间但不宜过长 • 典型值:30-60秒(根据业务调整)
异常处理: • 网络问题可能导致锁状态不一致 • 建议添加重试机制和监控告警
性能考量: • 避免频繁争抢锁(可加入随机等待) • 热点键问题可考虑分片
5. 常见问题解决方案
问题1:锁提前过期 • 现象:业务未完成但锁已释放 • 解决方案:实现锁续期机制(如3.2示例)
问题2:误释放他人锁 • 现象:A客户端释放了B客户端的锁 • 解决方案:必须使用唯一标识符验证(已在基础实现中包含)
问题3:Redis单点故障 • 现象:单Redis节点宕机导致锁服务不可用 • 解决方案:使用Redis哨兵/集群或Redlock算法
6. 生产环境建议
监控指标: • 锁获取成功率 • 平均等待时间 • 锁持有时间分布
日志记录:
python
import logging
logging.basicConfig(level=logging.INFO)
class LoggingRedisLock(RedisLock):
def acquire(self, timeout=10):
logging.info(f"Attempting to acquire lock {self.lock_name}")
result = super().acquire(timeout)
logging.info(f"Lock {'obtained' if result else 'failed'}")
return result- 熔断机制: • 当锁服务不可用时应有降级方案 • 例如本地锁+Redis锁的混合模式
7. 扩展阅读
- Redis官方分布式锁文档:https://redis.io/docs/manual/patterns/distributed-locks/
- Redlock算法论文:https://redis.io/topics/distlock
- 分布式系统协调服务对比(Redis/ZooKeeper/etcd)