Cache-Aside 模式
Cache-Aside(旁路缓存)是最常用的缓存读写模式,应用层同时管理缓存和数据库,Redis 作为透明的加速层。
Cache-Aside 读流程
Application
↓ 读请求
┌────────────────────────────────────────────────┐
│ 1. 查询 Redis │
│ ↓ 命中? │
│ YES ──▶ 直接返回缓存数据 ✓ │
│ NO ──▶ 2. 查询数据库 │
│ ↓ │
│ 3. 将数据写入 Redis(设置 TTL) │
│ ↓ │
│ 4. 返回数据给用户 │
└────────────────────────────────────────────────┘
Cache-Aside 写流程(先写库,再删缓存)
Application
↓ 写请求
1. 更新数据库(主操作)
↓
2. 删除 Redis 缓存(而非更新!)
↓
下次读请求时从库重新加载到缓存
为何写时删缓存而非更新缓存? 如果直接更新缓存,在高并发下两个写请求可能先后写库,但写缓存的顺序相反,导致缓存和库不一致。删除操作是幂等的,无此问题。
Cache-Aside 完整实现(Python)
import redis, json
from typing import Optional, Callable, TypeVar
r = redis.Redis(host='localhost', decode_responses=True)
T = TypeVar('T')
class CacheAsideManager:
def __init__(self, redis_client: redis.Redis, default_ttl: int = 3600):
self.r = redis_client
self.default_ttl = default_ttl
def get(self, key: str, loader: Callable[[], Optional[T]],
ttl: Optional[int] = None) -> Optional[T]:
"""读操作:先查缓存,未命中则从 loader 加载并写入缓存"""
# 1. 查询缓存
cached = self.r.get(key)
if cached is not None:
return json.loads(cached)
# 2. 缓存未命中,查询数据源
data = loader()
if data is not None:
# 3. 写入缓存
self.r.setex(
key,
ttl or self.default_ttl,
json.dumps(data, ensure_ascii=False)
)
return data
def invalidate(self, key: str) -> None:
"""写操作后使缓存失效"""
self.r.delete(key)
def update(self, key: str,
db_update: Callable[[], bool]) -> bool:
"""先写库,再删缓存(标准写流程)"""
success = db_update()
if success:
self.invalidate(key)
return success
# 使用示例(模拟数据库查询)
cache = CacheAsideManager(r, default_ttl=1800)
def get_product(product_id: int) -> Optional[dict]:
key = f"product:{product_id}"
def load_from_db():
# 实际项目中这里是数据库查询
return {'id': product_id, 'name': 'Redis 入门书', 'price': 99}
return cache.get(key, load_from_db)
三大缓存问题
1. 缓存穿透:查询不存在的数据
恶意用户不断请求数据库中不存在的 key(如 id=-1),每次都绕过缓存直接打到数据库,数据库压力骤增。
缓存穿透
攻击者: GET product:-1
↓
Redis: 不存在 → miss
↓ 每次都穿透
数据库: 不存在 → null ← 大量查询拖垮数据库!
解决方案一:缓存空值(简单但有内存风险)
def get_with_null_cache(key: str, loader: Callable) -> Optional[dict]:
cached = r.get(key)
if cached == "__null__":
return None # 命中空值缓存,直接返回 None
if cached is not None:
return json.loads(cached)
data = loader()
if data is None:
# 缓存空值,设置较短 TTL(避免内存泄漏)
r.setex(key, 300, "__null__")
else:
r.setex(key, 3600, json.dumps(data))
return data
解决方案二:布隆过滤器(生产推荐)
# 需要 Redis Stack 或 RedisBloom 模块
# pip install redis[hiredis]
from redis.commands.bf import BFCreate
# 初始化布隆过滤器(预估100万个元素,误判率0.1%)
r.bf().create('product:bloom', errorRate=0.001, capacity=1000000)
# 启动时将所有合法 ID 加入布隆过滤器
for product_id in get_all_product_ids():
r.bf().add('product:bloom', product_id)
def get_product_safe(product_id: int) -> Optional[dict]:
# 布隆过滤器说不存在,一定不存在;说存在,可能存在(有误判)
if not r.bf().exists('product:bloom', product_id):
return None # 直接拦截,不查缓存和数据库
return get_product(product_id)
2. 缓存击穿:热点 key 过期瞬间的雪崩
一个被大量并发请求的热点 key 突然过期,数千个请求同时穿透到数据库,数据库瞬间压力激增。
解决方案:互斥锁(Mutex)
import time
def get_hot_data(key: str, loader: Callable,
lock_timeout: int = 5) -> Optional[dict]:
"""使用互斥锁防止缓存击穿"""
# 1. 先查缓存
cached = r.get(key)
if cached:
return json.loads(cached)
# 2. 缓存未命中,尝试获取互斥锁
lock_key = f"lock:{key}"
lock_acquired = r.set(lock_key, 1, nx=True, ex=lock_timeout)
if lock_acquired:
try:
# 3. 获得锁:查询数据库并更新缓存
data = loader()
if data:
r.setex(key, 3600, json.dumps(data))
return data
finally:
r.delete(lock_key) # 释放锁
else:
# 4. 未获得锁:短暂等待后重试
time.sleep(0.05)
return get_hot_data(key, loader, lock_timeout)
3. 缓存雪崩:大量 key 同时过期
系统启动时批量写入了大量相同 TTL 的缓存,到时间点后集体过期,所有请求涌入数据库,引发连锁崩溃。
缓存雪崩示意
18:00 写入 1000 个 key,TTL=3600
↓
19:00 1000 个 key 同时过期!
↓
数据库: 承受 10000 QPS → 宕机
解决方案:随机 TTL 打散过期时间
import random
def set_with_jitter(key: str, value: dict,
base_ttl: int = 3600,
jitter: int = 600) -> None:
"""在基础 TTL 上加随机抖动,防止集体过期"""
ttl = base_ttl + random.randint(0, jitter)
r.setex(key, ttl, json.dumps(value))
# 批量写入时每个 key 的过期时间在 3600~4200 秒之间随机分布
for product_id in product_ids:
product = load_product_from_db(product_id)
set_with_jitter(f"product:{product_id}", product, base_ttl=3600)
热点 Key 问题
某个 key 的访问量极高(如微博热搜、双十一秒杀商品),单个 Redis 节点可能成为瓶颈,即便是 10 万 QPS 的 Redis 也可能不够。
解决方案:本地缓存 + Redis 多级缓存
from cachetools import TTLCache
import threading
# 本地 LRU 缓存(进程级,不共享)
local_cache = TTLCache(maxsize=1000, ttl=10) # 最多1000个key,10秒TTL
local_lock = threading.RLock()
def get_hot_product(product_id: int) -> Optional[dict]:
key = f"product:{product_id}"
# 第一层:本地内存缓存(纳秒级)
with local_lock:
if key in local_cache:
return local_cache[key]
# 第二层:Redis 缓存(微秒级)
cached = r.get(key)
if cached:
data = json.loads(cached)
with local_lock:
local_cache[key] = data
return data
# 第三层:数据库(毫秒级)
data = load_from_db(product_id)
if data:
r.setex(key, 3600, json.dumps(data))
with local_lock:
local_cache[key] = data
return data
缓存策略速查表
| 问题 | 现象 | 解决方案 | 适用场景 |
|---|---|---|---|
| 缓存穿透 | 大量请求打到 DB,但数据不存在 | 布隆过滤器 / 缓存空值 | 数据有大量非法查询 |
| 缓存击穿 | 热点 key 过期瞬间 DB 压力飙升 | 互斥锁 / 逻辑过期 | 少量极热的 key |
| 缓存雪崩 | 大量 key 同时过期,DB 崩溃 | 随机 TTL / 永不过期 + 异步刷新 | 批量加载的缓存 |
| 热点 key | 单个 Redis 节点 CPU 100% | 本地缓存 / key 复制到多节点 | 极高并发读 |
| 大 key | 网络传输慢,命令执行阻塞 | 拆分 key / 压缩 value | value 超过 10KB |
先删缓存再写库的双删策略陷阱:某些教程推荐"先删缓存,再写库,再删缓存"的延迟双删方案。这种方案存在第二次删除时机难以确定的问题,且在写库期间有短暂不一致窗口。更推荐的方案是:先写库,再删缓存,并配合 Canal/Binlog 监听数据库变更来异步刷新缓存,实现最终一致性。