Chapter 05

分布式锁原理与实现

从 SETNX 到 Redlock,从 Lua 脚本原子性到锁续期——分布式锁的完整设计哲学

为什么需要分布式锁

在单进程程序中,使用线程锁(mutex)可以防止并发竞争。但在分布式系统中,多台服务器上的多个进程同时访问共享资源(如数据库记录、库存计数),线程锁无效,需要一种全局的协调机制——分布式锁。

分布式库存扣减场景(无锁时的问题) Server A Server B 读库存: 1 读库存: 1 库存>0? YES 库存>0? YES 扣减库存 → 0 扣减库存 → 0 ↓ ↓ 创建订单 ✓ 创建订单 ✓ ← 超卖!库存只有1却卖了2

基础实现:SET NX PX

Redis 的 SET key value NX PX milliseconds 命令是原子操作,是分布式锁的基石:

绝对不能用两条命令替代:早期实现中有人先 SETNX key value,再 EXPIRE key seconds,这两条命令之间如果进程崩溃,锁永远不会过期,造成死锁。必须用一条 SET key value NX PX ms 原子命令。

锁的唯一标识(防止误删)

锁的 value 必须是当前持有者的唯一标识(UUID),释放锁时先验证是不是自己的锁再删除,防止 A 的锁超时后被 B 误删:

锁误删问题 进程 A 获得锁,value = "uuid-A" A 执行业务,时间过长 → 锁过期 进程 B 获得锁,value = "uuid-B" A 执行完,准备释放锁,执行 DEL lock A 错误地删除了 B 的锁!B 认为自己持有锁,实际已无锁保护 正确做法:验证 + 删除必须原子执行(使用 Lua) if GET lock == "uuid-A": ← 检查 DEL lock ← 删除 ← 这两步必须原子,否则在检查后删除前 A 的锁可能被抢占

完整分布式锁实现(Python)

import redis, uuid, time
from contextlib import contextmanager
from typing import Optional

# Lua 脚本:原子释放锁(验证 value 一致才删除)
RELEASE_SCRIPT = """
if redis.call("GET", KEYS[1]) == ARGV[1] then
    return redis.call("DEL", KEYS[1])
else
    return 0
end
"""

class RedisLock:
    def __init__(self, redis_client: redis.Redis,
                  lock_name: str,
                  expire_ms: int = 30000,   # 锁的过期时间(毫秒)
                  retry_times: int = 3,      # 重试次数
                  retry_delay_ms: int = 100):  # 重试间隔(毫秒)
        self.r = redis_client
        self.lock_key = f"dlock:{lock_name}"
        self.expire_ms = expire_ms
        self.retry_times = retry_times
        self.retry_delay_ms = retry_delay_ms
        self.lock_value: Optional[str] = None
        self._release_script = self.r.register_script(RELEASE_SCRIPT)

    def acquire(self) -> bool:
        """尝试获取锁,失败时重试"""
        self.lock_value = str(uuid.uuid4())
        for attempt in range(self.retry_times):
            result = self.r.set(
                self.lock_key,
                self.lock_value,
                nx=True,
                px=self.expire_ms
            )
            if result:
                return True
            if attempt < self.retry_times - 1:
                time.sleep(self.retry_delay_ms / 1000)
        return False

    def release(self) -> bool:
        """释放锁(Lua 脚本保证原子性)"""
        if not self.lock_value:
            return False
        result = self._release_script(
            keys=[self.lock_key],
            args=[self.lock_value]
        )
        self.lock_value = None
        return result == 1

    @contextmanager
    def lock(self):
        """上下文管理器用法"""
        acquired = self.acquire()
        if not acquired:
            raise RuntimeError(f"无法获取锁: {self.lock_key}")
        try:
            yield self
        finally:
            self.release()

# 使用示例:扣减库存
r = redis.Redis(host='localhost', decode_responses=True)

def deduct_stock(product_id: int, quantity: int) -> bool:
    lock = RedisLock(r, f"stock:{product_id}", expire_ms=5000)
    try:
        with lock.lock():
            # 临界区:安全地操作库存
            stock_key = f"stock:{product_id}"
            current = int(r.get(stock_key) or 0)
            if current < quantity:
                return False
            r.decrby(stock_key, quantity)
            return True
    except RuntimeError:
        return False  # 获取锁失败

Redlock 算法:多节点高可用锁

单节点 Redis 锁的风险:如果 Redis 主节点在写入锁后、同步到从节点前崩溃,故障转移后新主节点上没有这把锁,另一个客户端可以获得同一把锁,破坏互斥性。Redlock 通过多节点投票解决此问题。

Redlock 六步骤

Redlock 算法(N=5 个独立 Redis 节点) 步骤1: 记录当前时间 T1 步骤2: 依次向 N 个节点发送 SET lock uuid NX PX expire_ms Redis1 ✓ Redis2 ✓ Redis3 ✓ Redis4 ✗ Redis5 ✗ (获得 3/5 的投票) 步骤3: 计算获锁耗时 T2-T1 步骤4: 判断是否获锁成功: 条件1: 获得超过 N/2+1 个节点的锁(本例需≥3个) 条件2: 获锁耗时 < 锁过期时间(有效期 = expire_ms - 耗时) 步骤5: 使用锁(有效期 = 原过期时间 - 获锁耗时) 步骤6: 释放时向所有节点发送释放命令(无论是否获锁成功)
# pip install redlock-py
from redlock import Redlock, RedLockError

# 配置 5 个独立 Redis 节点(生产环境)
redlock = Redlock([
    {"host": "redis-1", "port": 6379, "db": 0},
    {"host": "redis-2", "port": 6379, "db": 0},
    {"host": "redis-3", "port": 6379, "db": 0},
    {"host": "redis-4", "port": 6379, "db": 0},
    {"host": "redis-5", "port": 6379, "db": 0},
], retry_count=3, retry_delay=0.2)

try:
    lock = redlock.lock("resource:order:create", 10000)  # 10秒
    if lock:
        try:
            process_critical_section()
        finally:
            redlock.unlock(lock)
except RedLockError as e:
    print(f"获取分布式锁失败: {e}")

锁续期(Watchdog)

当业务执行时间超过锁的过期时间时,锁自动释放会导致并发问题。Watchdog(看门狗)机制在后台定期续期锁:

import threading

class WatchdogLock(RedisLock):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._watchdog_thread: Optional[threading.Thread] = None
        self._running = False

    def _watchdog(self):
        """每隔 expire/3 毫秒续期一次"""
        interval = self.expire_ms / 3 / 1000
        while self._running:
            time.sleep(interval)
            if self._running and self.lock_value:
                # 仅当锁还是自己的才续期
                self.r.expire(self.lock_key, self.expire_ms // 1000)

    def acquire(self) -> bool:
        result = super().acquire()
        if result:
            self._running = True
            self._watchdog_thread = threading.Thread(
                target=self._watchdog, daemon=True)
            self._watchdog_thread.start()
        return result

    def release(self) -> bool:
        self._running = False
        return super().release()

与其他分布式锁对比

方案实现复杂度性能可靠性适用场景
Redis 单节点锁极高(微秒级)中(节点故障有风险)对一致性要求不极端的场景
Redlock高(需多次网络往返)高(多数节点存活即可)对强一致要求的关键操作
ZooKeeper中(毫秒级)极高(CP 系统)金融级强一致性
etcd极高(Raft 共识)Kubernetes 等云原生场景
数据库乐观锁低(DB 瓶颈)并发量小、已有 DB 事务

Redlock 争议:分布式系统专家 Martin Kleppmann 曾撰文指出 Redlock 在进程暂停(GC stop-the-world)或网络延迟场景下仍不安全。Redis 作者 antirez 也做了回应。实际项目中,如果对数据安全有极致要求,应使用 ZooKeeper 或 etcd;Redis 锁更适合对性能要求高、允许小概率误差的场景(如限流、幂等控制)。