Chapter 09

性能优化:Pipeline 与 Lua

消灭 RTT 延迟、Lua 脚本原子性、bigkey 诊断——Redis 性能调优全景

RTT:网络往返时间的杀手

每条 Redis 命令需要一次网络往返(RTT)。本地局域网 RTT 约 0.1ms,跨机房可达 1~5ms。当需要批量执行命令时,RTT 的累积开销可能远超 Redis 本身的处理时间。

无 Pipeline:N 次 RTT Client ──SET k1──▶ Redis ◀───OK───── Redis ← 等待 RTT ──SET k2──▶ Redis ◀───OK───── Redis ← 再等 RTT ... N次命令 = N × RTT 延迟 Pipeline:1 次 RTT Client ──SET k1──▶ ──SET k2──▶ Redis(批量处理) ──SET k3──▶ ◀─OK+OK+OK── Redis ← 只等 1 次 RTT

Pipeline 批处理

Pipeline 将多条命令打包一次发送,服务端顺序执行后批量返回结果。注意:Pipeline 中的命令是非原子的,其他客户端的命令可以插入执行。

import redis, time

r = redis.Redis(host='localhost', decode_responses=True)

# 对比:逐条执行 vs Pipeline
N = 1000

# 方式1:逐条执行(慢)
start = time.time()
for i in range(N):
    r.set(f"key:{i}", i)
print(f"逐条: {(time.time()-start)*1000:.0f}ms")  # ~500ms

# 方式2:Pipeline(快)
start = time.time()
with r.pipeline(transaction=False) as pipe:
    for i in range(N):
        pipe.set(f"key:{i}", i)
    results = pipe.execute()
print(f"Pipeline: {(time.time()-start)*1000:.0f}ms")  # ~10ms

# 分批 Pipeline(避免单次发送数据过多)
def batch_pipeline(data: dict, batch_size: int = 500) -> None:
    items = list(data.items())
    for i in range(0, len(items), batch_size):
        batch = items[i:i+batch_size]
        with r.pipeline(transaction=False) as pipe:
            for k, v in batch:
                pipe.set(k, v)
            pipe.execute()

MULTI/EXEC 事务

Redis 事务将多条命令打包为一个原子执行块,中途不会插入其他命令。但与关系型数据库不同,Redis 事务不支持回滚——即使某条命令执行失败,其余命令仍会继续执行。

# 事务示例:转账操作
# MULTI ... EXEC 之间的命令保证原子性(不被其他客户端插入)
127.0.0.1:6379> MULTI
OK
127.0.0.1:6379(TX)> DECRBY account:alice 100
QUEUED
127.0.0.1:6379(TX)> INCRBY account:bob 100
QUEUED
127.0.0.1:6379(TX)> EXEC
1) (integer) 900
2) (integer) 1100

WATCH 实现乐观锁WATCH key 监视某个 key,如果在 MULTI 到 EXEC 之间该 key 被其他客户端修改,EXEC 会失败(返回 nil)。这实现了 CAS(Compare-And-Swap)语义,但需要重试逻辑。在高并发场景推荐用 Lua 脚本替代。

Lua 脚本:真正的原子性

Lua 脚本在 Redis 单线程中以原子方式执行,相比 MULTI/EXEC 更强大——可以包含条件判断和循环,且整个脚本执行期间不会被其他命令打断。

# EVAL script numkeys key [key ...] arg [arg ...]

# 示例:原子条件更新(仅当值等于预期值时才更新)
CAS_SCRIPT = """
local current = redis.call('GET', KEYS[1])
if current == ARGV[1] then
    redis.call('SET', KEYS[1], ARGV[2])
    return 1   -- 更新成功
else
    return 0   -- 条件不匹配
end
"""

# 注册脚本(获得 SHA 哈希,后续用 EVALSHA 调用,避免重复传输脚本)
cas_sha = r.script_load(CAS_SCRIPT)

# 使用 EVALSHA 调用
result = r.evalsha(cas_sha, 1, 'counter', '100', '200')
# 仅当 counter == '100' 时才设为 '200'
# Python 推荐:使用 register_script
CHECK_AND_SET = """
local val = redis.call('GET', KEYS[1])
if tonumber(val) >= tonumber(ARGV[1]) then
    return redis.call('DECRBY', KEYS[1], ARGV[1])
else
    return redis.error_reply('INSUFFICIENT')
end
"""

deduct = r.register_script(CHECK_AND_SET)

try:
    remaining = deduct(keys=['stock:1001'], args=['5'])
    print(f"剩余库存: {remaining}")
except redis.exceptions.ResponseError as e:
    print(f"库存不足: {e}")

慢查询日志

# 配置慢查询(超过 10000 微秒=10ms 记录)
slowlog-log-slower-than 10000   # 单位:微秒
slowlog-max-len 128             # 最多保留128条

# 查看慢查询日志
SLOWLOG GET 10   # 获取最近10条
SLOWLOG LEN      # 日志总数
SLOWLOG RESET    # 清空日志

# 输出示例:
# 1) 1) (integer) 14          ← 日志 ID
#    2) (integer) 1700000001  ← Unix 时间戳
#    3) (integer) 15420       ← 执行时间(微秒)
#    4) 1) "KEYS"             ← 命令
#       2) "*"                ← 参数

bigkey 检测与优化

bigkey 是指 value 体积过大的 key(String > 10KB,集合元素 > 5000),会导致:网络传输慢、内存分配不均、删除时阻塞 Redis。

# 方法1:redis-cli 扫描(推荐在低峰期执行)
redis-cli --bigkeys --i 0.1   # --i 控制扫描间隔(秒),避免影响生产

# 方法2:OBJECT ENCODING 和 OBJECT IDLETIME
OBJECT ENCODING mykey      # 查看编码类型
OBJECT IDLETIME mykey      # 多少秒未被访问
OBJECT FREQ mykey          # LFU 访问频率

# 方法3:Python 扫描脚本
import redis

r = redis.Redis(host='localhost', decode_responses=True)
THRESHOLD_BYTES = 10240  # 10KB

def find_bigkeys(threshold: int = THRESHOLD_BYTES) -> list:
    bigkeys = []
    cursor = 0
    while True:
        cursor, keys = r.scan(cursor, count=100)
        for key in keys:
            dtype = r.type(key)
            if dtype == 'string':
                size = r.strlen(key)
            elif dtype == 'hash':
                size = r.hlen(key)
            elif dtype == 'list':
                size = r.llen(key)
            elif dtype == 'set':
                size = r.scard(key)
            elif dtype == 'zset':
                size = r.zcard(key)
            else:
                continue
            if size > threshold:
                bigkeys.append({'key': key, 'type': dtype, 'size': size})
        if cursor == 0:
            break
    return sorted(bigkeys, key=lambda x: x['size'], reverse=True)

bigkey 优化策略

场景问题解决方案
大 String单次传输慢压缩(msgpack/snappy)、分片存储
大 Hash(万级字段)HGETALL 阻塞拆分为多个小 Hash(如按字段范围分片)
大 List/Set/ZSet操作慢、迁移慢按时间或范围分页拆分
删除大 keyDEL 阻塞使用 UNLINK(异步删除)

DEL 大 key 阻塞 Redis:对一个有 100 万个元素的 ZSet 执行 DEL,Redis 需要逐一释放内存,可能阻塞数百毫秒。生产环境应使用 UNLINK 命令,它将删除操作放入后台线程异步执行,主线程立即返回。Redis 4.0+ 支持。

内存优化:淘汰策略

# redis.conf 内存配置
maxmemory 4gb                  # 最大内存限制
maxmemory-policy allkeys-lru   # 淘汰策略
策略淘汰范围算法适用场景
noeviction不淘汰内存溢出直接报错(默认)
allkeys-lru所有 keyLRU 最近最少使用缓存场景(推荐)
volatile-lru有 TTL 的 keyLRU混合存储(部分 key 为永久数据)
allkeys-lfu所有 keyLFU 最不频繁使用有明显冷热分布的缓存
volatile-ttl有 TTL 的 key优先淘汰 TTL 最短的
allkeys-random所有 key随机访问模式均匀时