Skip to content

Redis 分布式锁实现笔记(Python)

1. 基本概念

分布式锁:在分布式系统中协调多个进程/服务对共享资源的访问 • Redis 锁特性: • 互斥性:同一时刻只有一个客户端能持有锁 • 避免死锁:锁必须有超时机制 • 容错性:Redis 节点宕机时仍能正常运作

2. 简单实现方案

2.1 基础实现代码

python
import redis
import time
import uuid


class RedisLock:
    def __init__(self, redis_client, lock_name, expire_time=30):
        """
        :param redis_client: Redis 客户端连接
        :param lock_name: 锁名称
        :param expire_time: 锁自动过期时间(秒)
        """
        self.redis = redis_client
        self.lock_name = f"lock:{lock_name}"  # 添加前缀避免冲突
        self.expire_time = expire_time
        self.identifier = str(uuid.uuid4())  # 唯一标识符

    def acquire(self, timeout=10):
        """
        获取锁
        :param timeout: 获取锁的超时时间(秒)
        :return: bool 是否成功获取锁
        """
        end = time.time() + timeout
        while time.time() < end:
            # SET key value NX EX seconds
            if self.redis.set(
                    self.lock_name,
                    self.identifier,
                    nx=True,
                    ex=self.expire_time
            ):
                return True
            time.sleep(0.01)  # 适当等待避免CPU空转
        return False

    def release(self):
        """
        释放锁(使用Lua脚本保证原子性)
        """
        lua_script = """
        if redis.call("get", KEYS[1]) == ARGV[1] then
            return redis.call("del", KEYS[1])
        else
            return 0
        end
        """
        self.redis.eval(lua_script, 1, self.lock_name, self.identifier)

    # 实现上下文管理器协议 进入上下文时调用 (获取锁)
    def __enter__(self): 
        if not self.acquire():
            raise Exception(f"Failed to acquire lock {self.lock_name}")
        return self
    # 实现上下文管理器协议 退出上下文时调用 (释放锁)
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.release()

2.2 使用示例

python
# 创建Redis连接
r = redis.Redis(host='localhost', port=6379, db=0)

# 使用方式1:with语句(推荐)
with RedisLock(r, "order_123"):
    # 执行需要加锁的业务逻辑
    print("处理订单123...")

# 使用方式2:手动获取释放
lock = RedisLock(r, "inventory_456")
try:
    if lock.acquire(timeout=5):
        print("修改库存456...")
    else:
        print("获取锁超时")
finally:
    lock.release()

3. 高级实现方案

3.1 Redlock 算法(分布式环境)

python
from redlock import Redlock

# 初始化(连接多个Redis节点)
dlm = Redlock([
    {"host": "redis1.example.com", "port": 6379},
    {"host": "redis2.example.com", "port": 6379},
    {"host": "redis3.example.com", "port": 6379},
])

# 获取锁(有效期10秒)
lock = dlm.lock("global_resource", 10000)

if lock:
    try:
        print("处理全局资源...")
    finally:
        dlm.unlock(lock)
else:
    print("获取分布式锁失败")

3.2 锁续期机制

python
class RedisLockWithRenewal(RedisLock):
    def __init__(self, *args, renewal_interval=10, **kwargs):
        super().__init__(*args, **kwargs)
        self.renewal_interval = renewal_interval
        self._stop_renewal = False

    def _renew_lock(self):
        """后台线程续期锁"""
        while not self._stop_renewal:
            time.sleep(self.renewal_interval)
            self.redis.expire(self.lock_name, self.expire_time)

    def __enter__(self):
        if not self.acquire():
            raise Exception("Failed to acquire lock")

        # 启动续期线程
        self._stop_renewal = False
        import threading
        threading.Thread(
            target=self._renew_lock,
            daemon=True
        ).start()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self._stop_renewal = True
        self.release()

4. 关键注意事项

  1. 原子性操作: • 获取锁必须使用 SET key value NX EX seconds 命令组合 • 释放锁必须使用 Lua 脚本保证原子性

  2. 锁命名规范: • 使用业务相关的前缀(如 order_lock:{order_id}) • 避免使用通用名称导致冲突

  3. 超时时间设置: • 过期时间应大于业务处理时间但不宜过长 • 典型值:30-60秒(根据业务调整)

  4. 异常处理: • 网络问题可能导致锁状态不一致 • 建议添加重试机制和监控告警

  5. 性能考量: • 避免频繁争抢锁(可加入随机等待) • 热点键问题可考虑分片

5. 常见问题解决方案

问题1:锁提前过期 • 现象:业务未完成但锁已释放 • 解决方案:实现锁续期机制(如3.2示例)

问题2:误释放他人锁 • 现象:A客户端释放了B客户端的锁 • 解决方案:必须使用唯一标识符验证(已在基础实现中包含)

问题3:Redis单点故障 • 现象:单Redis节点宕机导致锁服务不可用 • 解决方案:使用Redis哨兵/集群或Redlock算法

6. 生产环境建议

  1. 监控指标: • 锁获取成功率 • 平均等待时间 • 锁持有时间分布

  2. 日志记录

python
   import logging

   logging.basicConfig(level=logging.INFO)


   class LoggingRedisLock(RedisLock):
    def acquire(self, timeout=10):
        logging.info(f"Attempting to acquire lock {self.lock_name}")
        result = super().acquire(timeout)
        logging.info(f"Lock {'obtained' if result else 'failed'}")
        return result
  1. 熔断机制: • 当锁服务不可用时应有降级方案 • 例如本地锁+Redis锁的混合模式

7. 扩展阅读

  1. Redis官方分布式锁文档:https://redis.io/docs/manual/patterns/distributed-locks/
  2. Redlock算法论文:https://redis.io/topics/distlock
  3. 分布式系统协调服务对比(Redis/ZooKeeper/etcd)

✨ 网站运行时间: 3年11月15天 ❤️ 道阻且长,行则将至 - 微信号: heikedreamer