Chapter 09

性能优化:Pipeline 与 Lua

消灭 RTT 延迟、Lua 脚本原子性、bigkey 诊断——Redis 性能调优全景

RTT:网络往返时间的杀手

每条 Redis 命令需要一次网络往返(RTT)。本地局域网 RTT 约 0.1ms,跨机房可达 1~5ms。当需要批量执行命令时,RTT 的累积开销可能远超 Redis 本身的处理时间。

无 Pipeline:N 次 RTT Client ──SET k1──▶ Redis ◀───OK───── Redis ← 等待 RTT ──SET k2──▶ Redis ◀───OK───── Redis ← 再等 RTT ... N次命令 = N × RTT 延迟 Pipeline:1 次 RTT Client ──SET k1──▶ ──SET k2──▶ Redis(批量处理) ──SET k3──▶ ◀─OK+OK+OK── Redis ← 只等 1 次 RTT

Pipeline 批处理

Pipeline 将多条命令打包一次发送,服务端顺序执行后批量返回结果。注意:Pipeline 中的命令是非原子的,其他客户端的命令可以插入执行。

import redis, time

r = redis.Redis(host='localhost', decode_responses=True)

# 对比:逐条执行 vs Pipeline
N = 1000

# 方式1:逐条执行(慢)
start = time.time()
for i in range(N):
    r.set(f"key:{i}", i)
print(f"逐条: {(time.time()-start)*1000:.0f}ms")  # ~500ms

# 方式2:Pipeline(快)
start = time.time()
with r.pipeline(transaction=False) as pipe:
    for i in range(N):
        pipe.set(f"key:{i}", i)
    results = pipe.execute()
print(f"Pipeline: {(time.time()-start)*1000:.0f}ms")  # ~10ms

# 分批 Pipeline(避免单次发送数据过多)
def batch_pipeline(data: dict, batch_size: int = 500) -> None:
    items = list(data.items())
    for i in range(0, len(items), batch_size):
        batch = items[i:i+batch_size]
        with r.pipeline(transaction=False) as pipe:
            for k, v in batch:
                pipe.set(k, v)
            pipe.execute()

MULTI/EXEC 事务

Redis 事务将多条命令打包为一个原子执行块,中途不会插入其他命令。但与关系型数据库不同,Redis 事务不支持回滚——即使某条命令执行失败,其余命令仍会继续执行。

# 事务示例:转账操作
# MULTI ... EXEC 之间的命令保证原子性(不被其他客户端插入)
127.0.0.1:6379> MULTI
OK
127.0.0.1:6379(TX)> DECRBY account:alice 100
QUEUED
127.0.0.1:6379(TX)> INCRBY account:bob 100
QUEUED
127.0.0.1:6379(TX)> EXEC
1) (integer) 900
2) (integer) 1100

WATCH 实现乐观锁WATCH key 监视某个 key,如果在 MULTI 到 EXEC 之间该 key 被其他客户端修改,EXEC 会失败(返回 nil)。这实现了 CAS(Compare-And-Swap)语义,但需要重试逻辑。在高并发场景推荐用 Lua 脚本替代。

Lua 脚本:真正的原子性

Lua 脚本在 Redis 单线程中以原子方式执行,相比 MULTI/EXEC 更强大——可以包含条件判断和循环,且整个脚本执行期间不会被其他命令打断。

# EVAL script numkeys key [key ...] arg [arg ...]

# 示例:原子条件更新(仅当值等于预期值时才更新)
CAS_SCRIPT = """
local current = redis.call('GET', KEYS[1])
if current == ARGV[1] then
    redis.call('SET', KEYS[1], ARGV[2])
    return 1   -- 更新成功
else
    return 0   -- 条件不匹配
end
"""

# 注册脚本(获得 SHA 哈希,后续用 EVALSHA 调用,避免重复传输脚本)
cas_sha = r.script_load(CAS_SCRIPT)

# 使用 EVALSHA 调用
result = r.evalsha(cas_sha, 1, 'counter', '100', '200')
# 仅当 counter == '100' 时才设为 '200'
# Python 推荐:使用 register_script
CHECK_AND_SET = """
local val = redis.call('GET', KEYS[1])
if tonumber(val) >= tonumber(ARGV[1]) then
    return redis.call('DECRBY', KEYS[1], ARGV[1])
else
    return redis.error_reply('INSUFFICIENT')
end
"""

deduct = r.register_script(CHECK_AND_SET)

try:
    remaining = deduct(keys=['stock:1001'], args=['5'])
    print(f"剩余库存: {remaining}")
except redis.exceptions.ResponseError as e:
    print(f"库存不足: {e}")

慢查询日志

# 配置慢查询(超过 10000 微秒=10ms 记录)
slowlog-log-slower-than 10000   # 单位:微秒
slowlog-max-len 128             # 最多保留128条

# 查看慢查询日志
SLOWLOG GET 10   # 获取最近10条
SLOWLOG LEN      # 日志总数
SLOWLOG RESET    # 清空日志

# 输出示例:
# 1) 1) (integer) 14          ← 日志 ID
#    2) (integer) 1700000001  ← Unix 时间戳
#    3) (integer) 15420       ← 执行时间(微秒)
#    4) 1) "KEYS"             ← 命令
#       2) "*"                ← 参数

bigkey 检测与优化

bigkey 是指 value 体积过大的 key(String > 10KB,集合元素 > 5000),会导致:网络传输慢、内存分配不均、删除时阻塞 Redis。

# 方法1:redis-cli 扫描(推荐在低峰期执行)
redis-cli --bigkeys --i 0.1   # --i 控制扫描间隔(秒),避免影响生产

# 方法2:OBJECT ENCODING 和 OBJECT IDLETIME
OBJECT ENCODING mykey      # 查看编码类型
OBJECT IDLETIME mykey      # 多少秒未被访问
OBJECT FREQ mykey          # LFU 访问频率

# 方法3:Python 扫描脚本
import redis

r = redis.Redis(host='localhost', decode_responses=True)
THRESHOLD_BYTES = 10240  # 10KB

def find_bigkeys(threshold: int = THRESHOLD_BYTES) -> list:
    bigkeys = []
    cursor = 0
    while True:
        cursor, keys = r.scan(cursor, count=100)
        for key in keys:
            dtype = r.type(key)
            if dtype == 'string':
                size = r.strlen(key)
            elif dtype == 'hash':
                size = r.hlen(key)
            elif dtype == 'list':
                size = r.llen(key)
            elif dtype == 'set':
                size = r.scard(key)
            elif dtype == 'zset':
                size = r.zcard(key)
            else:
                continue
            if size > threshold:
                bigkeys.append({'key': key, 'type': dtype, 'size': size})
        if cursor == 0:
            break
    return sorted(bigkeys, key=lambda x: x['size'], reverse=True)

bigkey 优化策略

场景问题解决方案
大 String单次传输慢压缩(msgpack/snappy)、分片存储
大 Hash(万级字段)HGETALL 阻塞拆分为多个小 Hash(如按字段范围分片)
大 List/Set/ZSet操作慢、迁移慢按时间或范围分页拆分
删除大 keyDEL 阻塞使用 UNLINK(异步删除)

DEL 大 key 阻塞 Redis:对一个有 100 万个元素的 ZSet 执行 DEL,Redis 需要逐一释放内存,可能阻塞数百毫秒。生产环境应使用 UNLINK 命令,它将删除操作放入后台线程异步执行,主线程立即返回。Redis 4.0+ 支持。

内存优化:淘汰策略

# redis.conf 内存配置
maxmemory 4gb                  # 最大内存限制
maxmemory-policy allkeys-lru   # 淘汰策略
策略淘汰范围算法适用场景
noeviction不淘汰内存溢出直接报错(默认)
allkeys-lru所有 keyLRU 最近最少使用缓存场景(推荐)
volatile-lru有 TTL 的 keyLRU混合存储(部分 key 为永久数据)
allkeys-lfu所有 keyLFU 最不频繁使用有明显冷热分布的缓存
volatile-ttl有 TTL 的 key优先淘汰 TTL 最短的
allkeys-random所有 key随机访问模式均匀时

Redis Functions:Lua 脚本的现代替代

Redis 7.0 引入 Functions,取代临时性的 EVAL/EVALSHA 脚本执行模式。Functions 将 Lua 代码作为库持久化到 Redis 服务端,服务重启后自动加载,无需应用层每次重新注册:

EVAL 脚本 vs Redis Functions 对比 EVAL 方式: 每次请求都传输脚本代码 → Redis 编译执行 → 重启后失效 虽然可用 EVALSHA 避免重复传输,但重启后 SHA 失效 Functions 方式: FUNCTION LOAD 一次加载到服务端 → 函数持久化到 AOF/RDB → 重启自动恢复 → 客户端只传函数名和参数
# 定义 Redis Function(Lua 库)
FUNCTION LOAD #!lua name=mylib

# 在 redis-cli 中加载完整库(使用 heredoc)
redis-cli FUNCTION LOAD "#!lua name=ratelimit
local function check_limit(keys, args)
    local key = keys[1]
    local limit = tonumber(args[1])
    local window = tonumber(args[2])
    local count = redis.call('INCR', key)
    if count == 1 then
        redis.call('EXPIRE', key, window)
    end
    return count <= limit and 1 or 0
end
redis.register_function('check_limit', check_limit)
"

# 调用 Function(语法与 EVALSHA 类似但更清晰)
FCALL check_limit 1 ratelimit:user:1001 100 60

# 查看已加载的函数库
FUNCTION LIST
FUNCTION STATS

# Python 调用
# Python 调用 Redis Functions
import redis

r = redis.Redis(host='localhost', decode_responses=True)

# 加载函数库(只需执行一次,Redis 重启后自动恢复)
RATELIMIT_LIB = """
#!lua name=ratelimit
local function check_limit(keys, args)
    local key = keys[1]
    local limit = tonumber(args[1])
    local window = tonumber(args[2])
    local count = redis.call('INCR', key)
    if count == 1 then
        redis.call('EXPIRE', key, window)
    end
    if count > limit then
        return redis.error_reply('RATE_LIMIT_EXCEEDED')
    end
    return count
end
redis.register_function('check_limit', check_limit)
"""

try:
    r.function_load(RATELIMIT_LIB)
except redis.ResponseError as e:
    if 'already exists' not in str(e):
        raise

# 调用:只传函数名和参数,无需传输代码
try:
    count = r.fcall('check_limit', 1, 'ratelimit:user:1001', 100, 60)
    print(f"当前请求次数: {count}")
except redis.ResponseError as e:
    print(f"限流触发: {e}")

性能优化清单

优化手段场景预期收益
Pipeline 批量命令N 次独立 SET/GET延迟从 N×RTT 降到 1×RTT,提速 10~100×
连接池(ConnectionPool)所有生产场景避免频繁建连开销,节约文件描述符
UNLINK 替代 DEL删除大 key避免阻塞主线程,异步后台删除
SCAN 替代 KEYS遍历所有 key避免全量阻塞,分批处理
allkeys-lru 淘汰策略纯缓存场景自动淘汰冷数据,防止 OOM
listpack 编码保持Hash/ZSet 小数据相比 hashtable 节省 60%+ 内存
Lua/Functions 原子操作复杂条件判断消除 WATCH+MULTI/EXEC 的乐观锁重试
关闭 AOF(纯缓存)无持久化需求写性能提升 30~50%,消除 fsync 开销
本章小结

Redis 性能优化的核心是消灭不必要的开销:
网络层:Pipeline 合并命令减少 RTT;连接池复用连接;客户端部署在同一机房/机器上降低物理延迟。
命令层:用 MSET/MGET 替代多次 SET/GET;用 HSCAN 替代 HGETALL 处理大 Hash;用 UNLINK 替代 DEL 删除大 key;禁用 KEYS *。
脚本层:Lua 脚本和 Redis Functions 将多步操作变为单次原子执行,既减少 RTT 又保证原子性。7.0+ 优先使用 Functions,代码持久化到服务端,更便于维护。
内存层:保持数据结构在 listpack 编码范围内;设置合理的 maxmemory 和淘汰策略;定期用 redis-cli --bigkeys 排查大 key。
慢查询诊断:slowlog-log-slower-than 10000(10ms)记录慢命令,定期分析并优化。