RTT:网络往返时间的杀手
每条 Redis 命令需要一次网络往返(RTT)。本地局域网 RTT 约 0.1ms,跨机房可达 1~5ms。当需要批量执行命令时,RTT 的累积开销可能远超 Redis 本身的处理时间。
Pipeline 批处理
Pipeline 将多条命令打包一次发送,服务端顺序执行后批量返回结果。注意:Pipeline 中的命令是非原子的,其他客户端的命令可以插入执行。
import redis, time
r = redis.Redis(host='localhost', decode_responses=True)
# 对比:逐条执行 vs Pipeline
N = 1000
# 方式1:逐条执行(慢)
start = time.time()
for i in range(N):
r.set(f"key:{i}", i)
print(f"逐条: {(time.time()-start)*1000:.0f}ms") # ~500ms
# 方式2:Pipeline(快)
start = time.time()
with r.pipeline(transaction=False) as pipe:
for i in range(N):
pipe.set(f"key:{i}", i)
results = pipe.execute()
print(f"Pipeline: {(time.time()-start)*1000:.0f}ms") # ~10ms
# 分批 Pipeline(避免单次发送数据过多)
def batch_pipeline(data: dict, batch_size: int = 500) -> None:
items = list(data.items())
for i in range(0, len(items), batch_size):
batch = items[i:i+batch_size]
with r.pipeline(transaction=False) as pipe:
for k, v in batch:
pipe.set(k, v)
pipe.execute()
MULTI/EXEC 事务
Redis 事务将多条命令打包为一个原子执行块,中途不会插入其他命令。但与关系型数据库不同,Redis 事务不支持回滚——即使某条命令执行失败,其余命令仍会继续执行。
# 事务示例:转账操作
# MULTI ... EXEC 之间的命令保证原子性(不被其他客户端插入)
127.0.0.1:6379> MULTI
OK
127.0.0.1:6379(TX)> DECRBY account:alice 100
QUEUED
127.0.0.1:6379(TX)> INCRBY account:bob 100
QUEUED
127.0.0.1:6379(TX)> EXEC
1) (integer) 900
2) (integer) 1100
WATCH 实现乐观锁:WATCH key 监视某个 key,如果在 MULTI 到 EXEC 之间该 key 被其他客户端修改,EXEC 会失败(返回 nil)。这实现了 CAS(Compare-And-Swap)语义,但需要重试逻辑。在高并发场景推荐用 Lua 脚本替代。
Lua 脚本:真正的原子性
Lua 脚本在 Redis 单线程中以原子方式执行,相比 MULTI/EXEC 更强大——可以包含条件判断和循环,且整个脚本执行期间不会被其他命令打断。
# EVAL script numkeys key [key ...] arg [arg ...]
# 示例:原子条件更新(仅当值等于预期值时才更新)
CAS_SCRIPT = """
local current = redis.call('GET', KEYS[1])
if current == ARGV[1] then
redis.call('SET', KEYS[1], ARGV[2])
return 1 -- 更新成功
else
return 0 -- 条件不匹配
end
"""
# 注册脚本(获得 SHA 哈希,后续用 EVALSHA 调用,避免重复传输脚本)
cas_sha = r.script_load(CAS_SCRIPT)
# 使用 EVALSHA 调用
result = r.evalsha(cas_sha, 1, 'counter', '100', '200')
# 仅当 counter == '100' 时才设为 '200'
# Python 推荐:使用 register_script
CHECK_AND_SET = """
local val = redis.call('GET', KEYS[1])
if tonumber(val) >= tonumber(ARGV[1]) then
return redis.call('DECRBY', KEYS[1], ARGV[1])
else
return redis.error_reply('INSUFFICIENT')
end
"""
deduct = r.register_script(CHECK_AND_SET)
try:
remaining = deduct(keys=['stock:1001'], args=['5'])
print(f"剩余库存: {remaining}")
except redis.exceptions.ResponseError as e:
print(f"库存不足: {e}")
慢查询日志
# 配置慢查询(超过 10000 微秒=10ms 记录)
slowlog-log-slower-than 10000 # 单位:微秒
slowlog-max-len 128 # 最多保留128条
# 查看慢查询日志
SLOWLOG GET 10 # 获取最近10条
SLOWLOG LEN # 日志总数
SLOWLOG RESET # 清空日志
# 输出示例:
# 1) 1) (integer) 14 ← 日志 ID
# 2) (integer) 1700000001 ← Unix 时间戳
# 3) (integer) 15420 ← 执行时间(微秒)
# 4) 1) "KEYS" ← 命令
# 2) "*" ← 参数
bigkey 检测与优化
bigkey 是指 value 体积过大的 key(String > 10KB,集合元素 > 5000),会导致:网络传输慢、内存分配不均、删除时阻塞 Redis。
# 方法1:redis-cli 扫描(推荐在低峰期执行)
redis-cli --bigkeys --i 0.1 # --i 控制扫描间隔(秒),避免影响生产
# 方法2:OBJECT ENCODING 和 OBJECT IDLETIME
OBJECT ENCODING mykey # 查看编码类型
OBJECT IDLETIME mykey # 多少秒未被访问
OBJECT FREQ mykey # LFU 访问频率
# 方法3:Python 扫描脚本
import redis
r = redis.Redis(host='localhost', decode_responses=True)
THRESHOLD_BYTES = 10240 # 10KB
def find_bigkeys(threshold: int = THRESHOLD_BYTES) -> list:
bigkeys = []
cursor = 0
while True:
cursor, keys = r.scan(cursor, count=100)
for key in keys:
dtype = r.type(key)
if dtype == 'string':
size = r.strlen(key)
elif dtype == 'hash':
size = r.hlen(key)
elif dtype == 'list':
size = r.llen(key)
elif dtype == 'set':
size = r.scard(key)
elif dtype == 'zset':
size = r.zcard(key)
else:
continue
if size > threshold:
bigkeys.append({'key': key, 'type': dtype, 'size': size})
if cursor == 0:
break
return sorted(bigkeys, key=lambda x: x['size'], reverse=True)
bigkey 优化策略
| 场景 | 问题 | 解决方案 |
|---|---|---|
| 大 String | 单次传输慢 | 压缩(msgpack/snappy)、分片存储 |
| 大 Hash(万级字段) | HGETALL 阻塞 | 拆分为多个小 Hash(如按字段范围分片) |
| 大 List/Set/ZSet | 操作慢、迁移慢 | 按时间或范围分页拆分 |
| 删除大 key | DEL 阻塞 | 使用 UNLINK(异步删除) |
DEL 大 key 阻塞 Redis:对一个有 100 万个元素的 ZSet 执行 DEL,Redis 需要逐一释放内存,可能阻塞数百毫秒。生产环境应使用 UNLINK 命令,它将删除操作放入后台线程异步执行,主线程立即返回。Redis 4.0+ 支持。
内存优化:淘汰策略
# redis.conf 内存配置
maxmemory 4gb # 最大内存限制
maxmemory-policy allkeys-lru # 淘汰策略
| 策略 | 淘汰范围 | 算法 | 适用场景 |
|---|---|---|---|
noeviction | 不淘汰 | — | 内存溢出直接报错(默认) |
allkeys-lru | 所有 key | LRU 最近最少使用 | 缓存场景(推荐) |
volatile-lru | 有 TTL 的 key | LRU | 混合存储(部分 key 为永久数据) |
allkeys-lfu | 所有 key | LFU 最不频繁使用 | 有明显冷热分布的缓存 |
volatile-ttl | 有 TTL 的 key | 优先淘汰 TTL 最短的 | — |
allkeys-random | 所有 key | 随机 | 访问模式均匀时 |
Redis Functions:Lua 脚本的现代替代
Redis 7.0 引入 Functions,取代临时性的 EVAL/EVALSHA 脚本执行模式。Functions 将 Lua 代码作为库持久化到 Redis 服务端,服务重启后自动加载,无需应用层每次重新注册:
# 定义 Redis Function(Lua 库)
FUNCTION LOAD #!lua name=mylib
# 在 redis-cli 中加载完整库(使用 heredoc)
redis-cli FUNCTION LOAD "#!lua name=ratelimit
local function check_limit(keys, args)
local key = keys[1]
local limit = tonumber(args[1])
local window = tonumber(args[2])
local count = redis.call('INCR', key)
if count == 1 then
redis.call('EXPIRE', key, window)
end
return count <= limit and 1 or 0
end
redis.register_function('check_limit', check_limit)
"
# 调用 Function(语法与 EVALSHA 类似但更清晰)
FCALL check_limit 1 ratelimit:user:1001 100 60
# 查看已加载的函数库
FUNCTION LIST
FUNCTION STATS
# Python 调用
# Python 调用 Redis Functions
import redis
r = redis.Redis(host='localhost', decode_responses=True)
# 加载函数库(只需执行一次,Redis 重启后自动恢复)
RATELIMIT_LIB = """
#!lua name=ratelimit
local function check_limit(keys, args)
local key = keys[1]
local limit = tonumber(args[1])
local window = tonumber(args[2])
local count = redis.call('INCR', key)
if count == 1 then
redis.call('EXPIRE', key, window)
end
if count > limit then
return redis.error_reply('RATE_LIMIT_EXCEEDED')
end
return count
end
redis.register_function('check_limit', check_limit)
"""
try:
r.function_load(RATELIMIT_LIB)
except redis.ResponseError as e:
if 'already exists' not in str(e):
raise
# 调用:只传函数名和参数,无需传输代码
try:
count = r.fcall('check_limit', 1, 'ratelimit:user:1001', 100, 60)
print(f"当前请求次数: {count}")
except redis.ResponseError as e:
print(f"限流触发: {e}")
性能优化清单
| 优化手段 | 场景 | 预期收益 |
|---|---|---|
| Pipeline 批量命令 | N 次独立 SET/GET | 延迟从 N×RTT 降到 1×RTT,提速 10~100× |
| 连接池(ConnectionPool) | 所有生产场景 | 避免频繁建连开销,节约文件描述符 |
| UNLINK 替代 DEL | 删除大 key | 避免阻塞主线程,异步后台删除 |
| SCAN 替代 KEYS | 遍历所有 key | 避免全量阻塞,分批处理 |
| allkeys-lru 淘汰策略 | 纯缓存场景 | 自动淘汰冷数据,防止 OOM |
| listpack 编码保持 | Hash/ZSet 小数据 | 相比 hashtable 节省 60%+ 内存 |
| Lua/Functions 原子操作 | 复杂条件判断 | 消除 WATCH+MULTI/EXEC 的乐观锁重试 |
| 关闭 AOF(纯缓存) | 无持久化需求 | 写性能提升 30~50%,消除 fsync 开销 |
Redis 性能优化的核心是消灭不必要的开销:
网络层:Pipeline 合并命令减少 RTT;连接池复用连接;客户端部署在同一机房/机器上降低物理延迟。
命令层:用 MSET/MGET 替代多次 SET/GET;用 HSCAN 替代 HGETALL 处理大 Hash;用 UNLINK 替代 DEL 删除大 key;禁用 KEYS *。
脚本层:Lua 脚本和 Redis Functions 将多步操作变为单次原子执行,既减少 RTT 又保证原子性。7.0+ 优先使用 Functions,代码持久化到服务端,更便于维护。
内存层:保持数据结构在 listpack 编码范围内;设置合理的 maxmemory 和淘汰策略;定期用 redis-cli --bigkeys 排查大 key。
慢查询诊断:slowlog-log-slower-than 10000(10ms)记录慢命令,定期分析并优化。