为什么需要分布式锁
在单进程程序中,使用线程锁(mutex)可以防止并发竞争。但在分布式系统中,多台服务器上的多个进程同时访问共享资源(如数据库记录、库存计数),线程锁无效,需要一种全局的协调机制——分布式锁。
基础实现:SET NX PX
Redis 的 SET key value NX PX milliseconds 命令是原子操作,是分布式锁的基石:
NX:仅当 key 不存在时设置(Not eXists)PX milliseconds:设置过期时间(毫秒),防止死锁- 返回 OK 表示加锁成功,返回 nil 表示锁已被占用
绝对不能用两条命令替代:早期实现中有人先 SETNX key value,再 EXPIRE key seconds,这两条命令之间如果进程崩溃,锁永远不会过期,造成死锁。必须用一条 SET key value NX PX ms 原子命令。
锁的唯一标识(防止误删)
锁的 value 必须是当前持有者的唯一标识(UUID),释放锁时先验证是不是自己的锁再删除,防止 A 的锁超时后被 B 误删:
完整分布式锁实现(Python)
import redis, uuid, time
from contextlib import contextmanager
from typing import Optional
# Lua 脚本:原子释放锁(验证 value 一致才删除)
RELEASE_SCRIPT = """
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
else
return 0
end
"""
class RedisLock:
def __init__(self, redis_client: redis.Redis,
lock_name: str,
expire_ms: int = 30000, # 锁的过期时间(毫秒)
retry_times: int = 3, # 重试次数
retry_delay_ms: int = 100): # 重试间隔(毫秒)
self.r = redis_client
self.lock_key = f"dlock:{lock_name}"
self.expire_ms = expire_ms
self.retry_times = retry_times
self.retry_delay_ms = retry_delay_ms
self.lock_value: Optional[str] = None
self._release_script = self.r.register_script(RELEASE_SCRIPT)
def acquire(self) -> bool:
"""尝试获取锁,失败时重试"""
self.lock_value = str(uuid.uuid4())
for attempt in range(self.retry_times):
result = self.r.set(
self.lock_key,
self.lock_value,
nx=True,
px=self.expire_ms
)
if result:
return True
if attempt < self.retry_times - 1:
time.sleep(self.retry_delay_ms / 1000)
return False
def release(self) -> bool:
"""释放锁(Lua 脚本保证原子性)"""
if not self.lock_value:
return False
result = self._release_script(
keys=[self.lock_key],
args=[self.lock_value]
)
self.lock_value = None
return result == 1
@contextmanager
def lock(self):
"""上下文管理器用法"""
acquired = self.acquire()
if not acquired:
raise RuntimeError(f"无法获取锁: {self.lock_key}")
try:
yield self
finally:
self.release()
# 使用示例:扣减库存
r = redis.Redis(host='localhost', decode_responses=True)
def deduct_stock(product_id: int, quantity: int) -> bool:
lock = RedisLock(r, f"stock:{product_id}", expire_ms=5000)
try:
with lock.lock():
# 临界区:安全地操作库存
stock_key = f"stock:{product_id}"
current = int(r.get(stock_key) or 0)
if current < quantity:
return False
r.decrby(stock_key, quantity)
return True
except RuntimeError:
return False # 获取锁失败
Redlock 算法:多节点高可用锁
单节点 Redis 锁的风险:如果 Redis 主节点在写入锁后、同步到从节点前崩溃,故障转移后新主节点上没有这把锁,另一个客户端可以获得同一把锁,破坏互斥性。Redlock 通过多节点投票解决此问题。
Redlock 六步骤
# pip install redlock-py
from redlock import Redlock, RedLockError
# 配置 5 个独立 Redis 节点(生产环境)
redlock = Redlock([
{"host": "redis-1", "port": 6379, "db": 0},
{"host": "redis-2", "port": 6379, "db": 0},
{"host": "redis-3", "port": 6379, "db": 0},
{"host": "redis-4", "port": 6379, "db": 0},
{"host": "redis-5", "port": 6379, "db": 0},
], retry_count=3, retry_delay=0.2)
try:
lock = redlock.lock("resource:order:create", 10000) # 10秒
if lock:
try:
process_critical_section()
finally:
redlock.unlock(lock)
except RedLockError as e:
print(f"获取分布式锁失败: {e}")
锁续期(Watchdog)
当业务执行时间超过锁的过期时间时,锁自动释放会导致并发问题。Watchdog(看门狗)机制在后台定期续期锁:
import threading
class WatchdogLock(RedisLock):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._watchdog_thread: Optional[threading.Thread] = None
self._running = False
def _watchdog(self):
"""每隔 expire/3 毫秒续期一次"""
interval = self.expire_ms / 3 / 1000
while self._running:
time.sleep(interval)
if self._running and self.lock_value:
# 仅当锁还是自己的才续期
self.r.expire(self.lock_key, self.expire_ms // 1000)
def acquire(self) -> bool:
result = super().acquire()
if result:
self._running = True
self._watchdog_thread = threading.Thread(
target=self._watchdog, daemon=True)
self._watchdog_thread.start()
return result
def release(self) -> bool:
self._running = False
return super().release()
与其他分布式锁对比
| 方案 | 实现复杂度 | 性能 | 可靠性 | 适用场景 |
|---|---|---|---|---|
| Redis 单节点锁 | 低 | 极高(微秒级) | 中(节点故障有风险) | 对一致性要求不极端的场景 |
| Redlock | 中 | 高(需多次网络往返) | 高(多数节点存活即可) | 对强一致要求的关键操作 |
| ZooKeeper | 高 | 中(毫秒级) | 极高(CP 系统) | 金融级强一致性 |
| etcd | 中 | 中 | 极高(Raft 共识) | Kubernetes 等云原生场景 |
| 数据库乐观锁 | 低 | 低(DB 瓶颈) | 高 | 并发量小、已有 DB 事务 |
Redlock 争议:分布式系统专家 Martin Kleppmann 曾撰文指出 Redlock 在进程暂停(GC stop-the-world)或网络延迟场景下仍不安全。Redis 作者 antirez 也做了回应。实际项目中,如果对数据安全有极致要求,应使用 ZooKeeper 或 etcd;Redis 锁更适合对性能要求高、允许小概率误差的场景(如限流、幂等控制)。
分布式锁的三大核心要素
锁超时问题深度分析
分布式锁无法完全防止 GC 暂停竞争:如果业务代码可能遭遇长时间 GC(Java 老年代 GC、Python GIL 长时间阻塞),即使使用 Watchdog 续期也有风险——GC 期间 Watchdog 线程也无法运行。完整解决方案需要在存储层引入 Fencing Token(版本号/乐观锁),Redis 分布式锁适合"尽力保证"而非"绝对保证"互斥的场景。
基于 Redis 的限流实现
分布式锁和限流是"锁"思想的两种变体。限流使用 Redis 的原子计数或滑动窗口实现:
# 固定窗口限流:每分钟最多 100 次请求
RATE_LIMIT_SCRIPT = """
local key = KEYS[1]
local limit = tonumber(ARGV[1])
local expire = tonumber(ARGV[2])
local current = redis.call('INCR', key)
if current == 1 then
redis.call('EXPIRE', key, expire) -- 第一次设置过期时间
end
if current > limit then
return 0 -- 超出限制
else
return 1 -- 允许通过
end
"""
rate_limit = r.register_script(RATE_LIMIT_SCRIPT)
def check_rate_limit(user_id: str, limit: int = 100, window: int = 60) -> bool:
"""返回 True 表示允许,False 表示被限流"""
import time
# key 包含时间窗口,每分钟重置
window_start = int(time.time()) // window
key = f"ratelimit:{user_id}:{window_start}"
result = rate_limit(keys=[key], args=[str(limit), str(window)])
return result == 1
# 使用示例
if check_rate_limit("user:1001", limit=100):
process_request()
else:
return_429_too_many_requests()
分布式锁是 Redis 在工程中最有挑战性的应用之一:
基础实现:SET key uuid NX PX ms 是原子加锁;Lua 脚本实现原子释放(GET+DEL);lock_value 使用 UUID 防止误删。
可靠性升级:Watchdog 定期续期防止业务超时导致锁失效;Redlock 多节点投票解决单节点故障问题。
局限性认知:Redis 锁无法防止 GC STW 导致的竞争,完整方案需要存储层的 Fencing Token;对于金融级一致性要求,ZooKeeper/etcd 更合适。
实际选型:对于绝大多数业务场景(限流、防重提交、秒杀库存),Redis 单节点锁性能足够、可靠性达标。