Chapter 04

缓存策略:设计与实践

Cache-Aside 模式、TTL 策略,以及缓存穿透、击穿、雪崩三大难题的生产级解决方案

Cache-Aside 模式

Cache-Aside(旁路缓存)是最常用的缓存读写模式,应用层同时管理缓存和数据库,Redis 作为透明的加速层。

Cache-Aside 读流程 Application ↓ 读请求 ┌────────────────────────────────────────────────┐ │ 1. 查询 Redis │ │ ↓ 命中? │ │ YES ──▶ 直接返回缓存数据 ✓ │ │ NO ──▶ 2. 查询数据库 │ │ ↓ │ │ 3. 将数据写入 Redis(设置 TTL) │ │ ↓ │ │ 4. 返回数据给用户 │ └────────────────────────────────────────────────┘ Cache-Aside 写流程(先写库,再删缓存) Application ↓ 写请求 1. 更新数据库(主操作) 2. 删除 Redis 缓存(而非更新!) 下次读请求时从库重新加载到缓存

为何写时删缓存而非更新缓存? 如果直接更新缓存,在高并发下两个写请求可能先后写库,但写缓存的顺序相反,导致缓存和库不一致。删除操作是幂等的,无此问题。

Cache-Aside 完整实现(Python)

import redis, json
from typing import Optional, Callable, TypeVar

r = redis.Redis(host='localhost', decode_responses=True)
T = TypeVar('T')

class CacheAsideManager:
    def __init__(self, redis_client: redis.Redis, default_ttl: int = 3600):
        self.r = redis_client
        self.default_ttl = default_ttl

    def get(self, key: str, loader: Callable[[], Optional[T]],
             ttl: Optional[int] = None) -> Optional[T]:
        """读操作:先查缓存,未命中则从 loader 加载并写入缓存"""
        # 1. 查询缓存
        cached = self.r.get(key)
        if cached is not None:
            return json.loads(cached)

        # 2. 缓存未命中,查询数据源
        data = loader()
        if data is not None:
            # 3. 写入缓存
            self.r.setex(
                key,
                ttl or self.default_ttl,
                json.dumps(data, ensure_ascii=False)
            )
        return data

    def invalidate(self, key: str) -> None:
        """写操作后使缓存失效"""
        self.r.delete(key)

    def update(self, key: str,
                db_update: Callable[[], bool]) -> bool:
        """先写库,再删缓存(标准写流程)"""
        success = db_update()
        if success:
            self.invalidate(key)
        return success

# 使用示例(模拟数据库查询)
cache = CacheAsideManager(r, default_ttl=1800)

def get_product(product_id: int) -> Optional[dict]:
    key = f"product:{product_id}"

    def load_from_db():
        # 实际项目中这里是数据库查询
        return {'id': product_id, 'name': 'Redis 入门书', 'price': 99}

    return cache.get(key, load_from_db)

三大缓存问题

1. 缓存穿透:查询不存在的数据

恶意用户不断请求数据库中不存在的 key(如 id=-1),每次都绕过缓存直接打到数据库,数据库压力骤增。

缓存穿透 攻击者: GET product:-1 Redis: 不存在 → miss ↓ 每次都穿透 数据库: 不存在 → null ← 大量查询拖垮数据库!

解决方案一:缓存空值(简单但有内存风险)

def get_with_null_cache(key: str, loader: Callable) -> Optional[dict]:
    cached = r.get(key)
    if cached == "__null__":
        return None  # 命中空值缓存,直接返回 None
    if cached is not None:
        return json.loads(cached)

    data = loader()
    if data is None:
        # 缓存空值,设置较短 TTL(避免内存泄漏)
        r.setex(key, 300, "__null__")
    else:
        r.setex(key, 3600, json.dumps(data))
    return data

解决方案二:布隆过滤器(生产推荐)

# 需要 Redis Stack 或 RedisBloom 模块
# pip install redis[hiredis]
from redis.commands.bf import BFCreate

# 初始化布隆过滤器(预估100万个元素,误判率0.1%)
r.bf().create('product:bloom', errorRate=0.001, capacity=1000000)

# 启动时将所有合法 ID 加入布隆过滤器
for product_id in get_all_product_ids():
    r.bf().add('product:bloom', product_id)

def get_product_safe(product_id: int) -> Optional[dict]:
    # 布隆过滤器说不存在,一定不存在;说存在,可能存在(有误判)
    if not r.bf().exists('product:bloom', product_id):
        return None  # 直接拦截,不查缓存和数据库
    return get_product(product_id)

2. 缓存击穿:热点 key 过期瞬间的雪崩

一个被大量并发请求的热点 key 突然过期,数千个请求同时穿透到数据库,数据库瞬间压力激增。

解决方案:互斥锁(Mutex)

import time

def get_hot_data(key: str, loader: Callable,
                  lock_timeout: int = 5) -> Optional[dict]:
    """使用互斥锁防止缓存击穿"""
    # 1. 先查缓存
    cached = r.get(key)
    if cached:
        return json.loads(cached)

    # 2. 缓存未命中,尝试获取互斥锁
    lock_key = f"lock:{key}"
    lock_acquired = r.set(lock_key, 1, nx=True, ex=lock_timeout)

    if lock_acquired:
        try:
            # 3. 获得锁:查询数据库并更新缓存
            data = loader()
            if data:
                r.setex(key, 3600, json.dumps(data))
            return data
        finally:
            r.delete(lock_key)  # 释放锁
    else:
        # 4. 未获得锁:短暂等待后重试
        time.sleep(0.05)
        return get_hot_data(key, loader, lock_timeout)

3. 缓存雪崩:大量 key 同时过期

系统启动时批量写入了大量相同 TTL 的缓存,到时间点后集体过期,所有请求涌入数据库,引发连锁崩溃。

缓存雪崩示意 18:00 写入 1000 个 key,TTL=3600 19:00 1000 个 key 同时过期! 数据库: 承受 10000 QPS → 宕机

解决方案:随机 TTL 打散过期时间

import random

def set_with_jitter(key: str, value: dict,
                     base_ttl: int = 3600,
                     jitter: int = 600) -> None:
    """在基础 TTL 上加随机抖动,防止集体过期"""
    ttl = base_ttl + random.randint(0, jitter)
    r.setex(key, ttl, json.dumps(value))

# 批量写入时每个 key 的过期时间在 3600~4200 秒之间随机分布
for product_id in product_ids:
    product = load_product_from_db(product_id)
    set_with_jitter(f"product:{product_id}", product, base_ttl=3600)

热点 Key 问题

某个 key 的访问量极高(如微博热搜、双十一秒杀商品),单个 Redis 节点可能成为瓶颈,即便是 10 万 QPS 的 Redis 也可能不够。

解决方案:本地缓存 + Redis 多级缓存

from cachetools import TTLCache
import threading

# 本地 LRU 缓存(进程级,不共享)
local_cache = TTLCache(maxsize=1000, ttl=10)  # 最多1000个key,10秒TTL
local_lock = threading.RLock()

def get_hot_product(product_id: int) -> Optional[dict]:
    key = f"product:{product_id}"

    # 第一层:本地内存缓存(纳秒级)
    with local_lock:
        if key in local_cache:
            return local_cache[key]

    # 第二层:Redis 缓存(微秒级)
    cached = r.get(key)
    if cached:
        data = json.loads(cached)
        with local_lock:
            local_cache[key] = data
        return data

    # 第三层:数据库(毫秒级)
    data = load_from_db(product_id)
    if data:
        r.setex(key, 3600, json.dumps(data))
        with local_lock:
            local_cache[key] = data
    return data

缓存策略速查表

问题现象解决方案适用场景
缓存穿透大量请求打到 DB,但数据不存在布隆过滤器 / 缓存空值数据有大量非法查询
缓存击穿热点 key 过期瞬间 DB 压力飙升互斥锁 / 逻辑过期少量极热的 key
缓存雪崩大量 key 同时过期,DB 崩溃随机 TTL / 永不过期 + 异步刷新批量加载的缓存
热点 key单个 Redis 节点 CPU 100%本地缓存 / key 复制到多节点极高并发读
大 key网络传输慢,命令执行阻塞拆分 key / 压缩 valuevalue 超过 10KB

先删缓存再写库的双删策略陷阱:某些教程推荐"先删缓存,再写库,再删缓存"的延迟双删方案。这种方案存在第二次删除时机难以确定的问题,且在写库期间有短暂不一致窗口。更推荐的方案是:先写库,再删缓存,并配合 Canal/Binlog 监听数据库变更来异步刷新缓存,实现最终一致性。