为什么需要分布式锁
在单进程程序中,使用线程锁(mutex)可以防止并发竞争。但在分布式系统中,多台服务器上的多个进程同时访问共享资源(如数据库记录、库存计数),线程锁无效,需要一种全局的协调机制——分布式锁。
分布式库存扣减场景(无锁时的问题)
Server A Server B
读库存: 1 读库存: 1
库存>0? YES 库存>0? YES
扣减库存 → 0 扣减库存 → 0
↓ ↓
创建订单 ✓ 创建订单 ✓ ← 超卖!库存只有1却卖了2
基础实现:SET NX PX
Redis 的 SET key value NX PX milliseconds 命令是原子操作,是分布式锁的基石:
NX:仅当 key 不存在时设置(Not eXists)PX milliseconds:设置过期时间(毫秒),防止死锁- 返回 OK 表示加锁成功,返回 nil 表示锁已被占用
绝对不能用两条命令替代:早期实现中有人先 SETNX key value,再 EXPIRE key seconds,这两条命令之间如果进程崩溃,锁永远不会过期,造成死锁。必须用一条 SET key value NX PX ms 原子命令。
锁的唯一标识(防止误删)
锁的 value 必须是当前持有者的唯一标识(UUID),释放锁时先验证是不是自己的锁再删除,防止 A 的锁超时后被 B 误删:
锁误删问题
进程 A 获得锁,value = "uuid-A"
A 执行业务,时间过长 → 锁过期
进程 B 获得锁,value = "uuid-B"
A 执行完,准备释放锁,执行 DEL lock
A 错误地删除了 B 的锁!B 认为自己持有锁,实际已无锁保护
正确做法:验证 + 删除必须原子执行(使用 Lua)
if GET lock == "uuid-A": ← 检查
DEL lock ← 删除
← 这两步必须原子,否则在检查后删除前 A 的锁可能被抢占
完整分布式锁实现(Python)
import redis, uuid, time
from contextlib import contextmanager
from typing import Optional
# Lua 脚本:原子释放锁(验证 value 一致才删除)
RELEASE_SCRIPT = """
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
else
return 0
end
"""
class RedisLock:
def __init__(self, redis_client: redis.Redis,
lock_name: str,
expire_ms: int = 30000, # 锁的过期时间(毫秒)
retry_times: int = 3, # 重试次数
retry_delay_ms: int = 100): # 重试间隔(毫秒)
self.r = redis_client
self.lock_key = f"dlock:{lock_name}"
self.expire_ms = expire_ms
self.retry_times = retry_times
self.retry_delay_ms = retry_delay_ms
self.lock_value: Optional[str] = None
self._release_script = self.r.register_script(RELEASE_SCRIPT)
def acquire(self) -> bool:
"""尝试获取锁,失败时重试"""
self.lock_value = str(uuid.uuid4())
for attempt in range(self.retry_times):
result = self.r.set(
self.lock_key,
self.lock_value,
nx=True,
px=self.expire_ms
)
if result:
return True
if attempt < self.retry_times - 1:
time.sleep(self.retry_delay_ms / 1000)
return False
def release(self) -> bool:
"""释放锁(Lua 脚本保证原子性)"""
if not self.lock_value:
return False
result = self._release_script(
keys=[self.lock_key],
args=[self.lock_value]
)
self.lock_value = None
return result == 1
@contextmanager
def lock(self):
"""上下文管理器用法"""
acquired = self.acquire()
if not acquired:
raise RuntimeError(f"无法获取锁: {self.lock_key}")
try:
yield self
finally:
self.release()
# 使用示例:扣减库存
r = redis.Redis(host='localhost', decode_responses=True)
def deduct_stock(product_id: int, quantity: int) -> bool:
lock = RedisLock(r, f"stock:{product_id}", expire_ms=5000)
try:
with lock.lock():
# 临界区:安全地操作库存
stock_key = f"stock:{product_id}"
current = int(r.get(stock_key) or 0)
if current < quantity:
return False
r.decrby(stock_key, quantity)
return True
except RuntimeError:
return False # 获取锁失败
Redlock 算法:多节点高可用锁
单节点 Redis 锁的风险:如果 Redis 主节点在写入锁后、同步到从节点前崩溃,故障转移后新主节点上没有这把锁,另一个客户端可以获得同一把锁,破坏互斥性。Redlock 通过多节点投票解决此问题。
Redlock 六步骤
Redlock 算法(N=5 个独立 Redis 节点)
步骤1: 记录当前时间 T1
步骤2: 依次向 N 个节点发送 SET lock uuid NX PX expire_ms
Redis1 ✓ Redis2 ✓ Redis3 ✓ Redis4 ✗ Redis5 ✗
(获得 3/5 的投票)
步骤3: 计算获锁耗时 T2-T1
步骤4: 判断是否获锁成功:
条件1: 获得超过 N/2+1 个节点的锁(本例需≥3个)
条件2: 获锁耗时 < 锁过期时间(有效期 = expire_ms - 耗时)
步骤5: 使用锁(有效期 = 原过期时间 - 获锁耗时)
步骤6: 释放时向所有节点发送释放命令(无论是否获锁成功)
# pip install redlock-py
from redlock import Redlock, RedLockError
# 配置 5 个独立 Redis 节点(生产环境)
redlock = Redlock([
{"host": "redis-1", "port": 6379, "db": 0},
{"host": "redis-2", "port": 6379, "db": 0},
{"host": "redis-3", "port": 6379, "db": 0},
{"host": "redis-4", "port": 6379, "db": 0},
{"host": "redis-5", "port": 6379, "db": 0},
], retry_count=3, retry_delay=0.2)
try:
lock = redlock.lock("resource:order:create", 10000) # 10秒
if lock:
try:
process_critical_section()
finally:
redlock.unlock(lock)
except RedLockError as e:
print(f"获取分布式锁失败: {e}")
锁续期(Watchdog)
当业务执行时间超过锁的过期时间时,锁自动释放会导致并发问题。Watchdog(看门狗)机制在后台定期续期锁:
import threading
class WatchdogLock(RedisLock):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._watchdog_thread: Optional[threading.Thread] = None
self._running = False
def _watchdog(self):
"""每隔 expire/3 毫秒续期一次"""
interval = self.expire_ms / 3 / 1000
while self._running:
time.sleep(interval)
if self._running and self.lock_value:
# 仅当锁还是自己的才续期
self.r.expire(self.lock_key, self.expire_ms // 1000)
def acquire(self) -> bool:
result = super().acquire()
if result:
self._running = True
self._watchdog_thread = threading.Thread(
target=self._watchdog, daemon=True)
self._watchdog_thread.start()
return result
def release(self) -> bool:
self._running = False
return super().release()
与其他分布式锁对比
| 方案 | 实现复杂度 | 性能 | 可靠性 | 适用场景 |
|---|---|---|---|---|
| Redis 单节点锁 | 低 | 极高(微秒级) | 中(节点故障有风险) | 对一致性要求不极端的场景 |
| Redlock | 中 | 高(需多次网络往返) | 高(多数节点存活即可) | 对强一致要求的关键操作 |
| ZooKeeper | 高 | 中(毫秒级) | 极高(CP 系统) | 金融级强一致性 |
| etcd | 中 | 中 | 极高(Raft 共识) | Kubernetes 等云原生场景 |
| 数据库乐观锁 | 低 | 低(DB 瓶颈) | 高 | 并发量小、已有 DB 事务 |
Redlock 争议:分布式系统专家 Martin Kleppmann 曾撰文指出 Redlock 在进程暂停(GC stop-the-world)或网络延迟场景下仍不安全。Redis 作者 antirez 也做了回应。实际项目中,如果对数据安全有极致要求,应使用 ZooKeeper 或 etcd;Redis 锁更适合对性能要求高、允许小概率误差的场景(如限流、幂等控制)。