实战一:滑动窗口限流
限流是保护后端服务的关键手段。固定窗口限流(如每分钟 100 次)存在窗口边界突刺问题——第 59 秒打 100 次、第 61 秒再打 100 次,实际上 2 秒内发送了 200 次请求。滑动窗口以当前时间为右边界,精确控制任意时间窗口内的请求数。
滑动窗口限流(ZSet 实现)
窗口大小: 60秒,限制: 100次
时间轴: ──────────────────────────────────────
[旧请求] [过期] [12:00:00 ~ 12:01:00 窗口] [now]
↑移除过期 ↑加入新请求
ZSet 中存储: member=请求ID(UUID), score=时间戳(ms)
每次请求:
1. ZREMRANGEBYSCORE 删除 (now-window) 之前的过期记录
2. ZCARD 统计当前窗口内的请求数
3. 若 < 限制:ZADD 加入本次请求,放行
4. 若 >= 限制:拒绝,返回 429
import redis, uuid, time
from typing import Tuple
r = redis.Redis(host='localhost', decode_responses=True)
# Lua 脚本保证原子性(检查+写入一步完成)
SLIDING_WINDOW_SCRIPT = """
local key = KEYS[1]
local now = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local limit = tonumber(ARGV[3])
local req_id = ARGV[4]
-- 删除过期的请求记录
redis.call('ZREMRANGEBYSCORE', key, 0, now - window)
-- 当前窗口内的请求数
local count = redis.call('ZCARD', key)
if count < limit then
-- 未超限:记录本次请求
redis.call('ZADD', key, now, req_id)
redis.call('EXPIRE', key, math.ceil(window/1000) + 1)
return {1, count + 1} -- {允许, 当前计数}
else
return {0, count} -- {拒绝, 当前计数}
end
"""
sliding_window = r.register_script(SLIDING_WINDOW_SCRIPT)
def rate_limit(identifier: str,
limit: int = 100,
window_ms: int = 60000) -> Tuple[bool, int]:
"""
滑动窗口限流
:param identifier: 限流标识(用户ID/IP/接口)
:param limit: 窗口内最大请求数
:param window_ms: 窗口大小(毫秒)
:return: (是否放行, 当前窗口内请求数)
"""
key = f"ratelimit:{identifier}"
now_ms = int(time.time() * 1000)
req_id = f"{now_ms}-{uuid.uuid4().hex[:8]}"
result = sliding_window(
keys=[key],
args=[now_ms, window_ms, limit, req_id]
)
return result[0] == 1, result[1]
# FastAPI 中间件示例
# from fastapi import Request, HTTPException
# @app.middleware("http")
# async def rate_limit_middleware(request: Request, call_next):
# client_ip = request.client.host
# allowed, count = rate_limit(f"ip:{client_ip}", limit=60, window_ms=60000)
# if not allowed:
# raise HTTPException(status_code=429, detail="Too Many Requests")
# response = await call_next(request)
# response.headers["X-RateLimit-Count"] = str(count)
# return response
# 测试
for i in range(5):
allowed, count = rate_limit("user:1001", limit=3, window_ms=10000)
print(f"请求{i+1}: 放行={allowed}, 计数={count}")
实战二:JWT + Redis Session 管理
纯 JWT 的问题:Token 无法主动失效(如用户退出登录)。结合 Redis,可以实现强制下线、多端登录控制、Token 刷新等功能。
import redis, jwt, uuid, time, hashlib
from datetime import datetime, timedelta
from typing import Optional
r = redis.Redis(host='localhost', decode_responses=True)
SECRET = "your-secret-key"
ACCESS_TTL = 3600 # Access Token 有效期 1小时
REFRESH_TTL = 604800 # Refresh Token 有效期 7天
def create_session(user_id: int, device: str = "web") -> dict:
"""登录:创建 Access Token + Refresh Token"""
jti = uuid.uuid4().hex # JWT ID,唯一标识符
now = int(time.time())
# Access Token(短期,携带用户信息)
access_payload = {
'uid': user_id, 'jti': jti,
'device': device, 'iat': now,
'exp': now + ACCESS_TTL
}
access_token = jwt.encode(access_payload, SECRET, algorithm='HS256')
# Refresh Token(长期,用于刷新 Access Token)
refresh_token = hashlib.sha256(uuid.uuid4().bytes).hexdigest()
# Redis 存储:记录有效的 Session
session_key = f"session:{user_id}:{device}"
refresh_key = f"refresh:{refresh_token}"
r.hset(session_key, mapping={
'jti': jti, 'refresh': refresh_token,
'created_at': now, 'device': device
})
r.expire(session_key, REFRESH_TTL)
r.setex(refresh_key, REFRESH_TTL, user_id)
return {'access_token': access_token, 'refresh_token': refresh_token}
def verify_token(access_token: str) -> Optional[dict]:
"""验证 Access Token 是否有效"""
try:
payload = jwt.decode(access_token, SECRET, algorithms=['HS256'])
# 检查 Session 是否已被主动撤销
session_key = f"session:{payload['uid']}:{payload['device']}"
stored_jti = r.hget(session_key, 'jti')
if stored_jti != payload['jti']:
return None # Token 已被刷新或登出
return payload
except jwt.ExpiredSignatureError:
return None
except jwt.InvalidTokenError:
return None
def logout(user_id: int, device: str = "web") -> None:
"""登出:删除 Redis Session(Token 立即失效)"""
session_key = f"session:{user_id}:{device}"
refresh = r.hget(session_key, 'refresh')
if refresh:
r.delete(f"refresh:{refresh}")
r.delete(session_key)
实战三:ZSet 实时排行榜
import redis
from datetime import datetime
r = redis.Redis(host='localhost', decode_responses=True)
def leaderboard_key(period: str = 'daily') -> str:
"""按周期生成排行榜 key(日榜/周榜/月榜)"""
today = datetime.now().strftime('%Y%m%d')
week = datetime.now().strftime('%Y-W%W')
month = datetime.now().strftime('%Y%m')
keys = {'daily': today, 'weekly': week, 'monthly': month}
return f"rank:{period}:{keys[period]}"
def add_score(user_id: str, score_delta: float,
periods: list = ['daily', 'weekly', 'monthly']) -> dict:
"""加分:同时更新多个周期的排行榜"""
with r.pipeline(transaction=False) as pipe:
for period in periods:
key = leaderboard_key(period)
pipe.zincrby(key, score_delta, user_id)
results = pipe.execute()
return {p: r for p, r in zip(periods, results)}
def get_leaderboard(period: str = 'daily', top_n: int = 10) -> list:
"""获取 Top N 排行"""
key = leaderboard_key(period)
entries = r.zrange(key, 0, top_n - 1, rev=True, withscores=True)
return [
{'rank': i + 1, 'user_id': uid, 'score': score}
for i, (uid, score) in enumerate(entries)
]
# 测试
add_score("alice", 100)
add_score("bob", 200)
add_score("alice", 150) # alice 累计 250
print(get_leaderboard('daily'))
# [{'rank': 1, 'user_id': 'alice', 'score': 250.0},
# {'rank': 2, 'user_id': 'bob', 'score': 200.0}]
实战四:GEO 地理位置查询
Redis GEO 基于 ZSet 实现,将经纬度编码为 geohash 存入 score,支持距离计算和范围查询(如"附近的门店")。
# 添加商家位置(经度, 纬度, 名称)
GEOADD stores 121.4737 31.2304 "人民广场店"
GEOADD stores 121.5033 31.2380 "外滩店"
GEOADD stores 121.4437 31.1782 "徐汇店"
# 计算两点距离(km)
GEODIST stores "人民广场店" "外滩店" km
# 搜索附近5km内的门店
GEOSEARCH stores FROMMEMBER "人民广场店" BYRADIUS 5 km ASC WITHCOORD WITHDIST COUNT 10
def find_nearby_stores(lng: float, lat: float,
radius_km: float = 5.0,
max_count: int = 10) -> list:
"""查找附近门店"""
results = r.geosearch(
'stores',
longitude=lng, latitude=lat,
radius=radius_km, unit='km',
sort='ASC',
count=max_count,
withcoord=True,
withdist=True
)
return [
{'name': name, 'distance_km': dist, 'location': coord}
for name, dist, coord in results
]
stores = find_nearby_stores(121.473, 31.230)
for store in stores:
print(f"{store['name']} - {store['distance_km']:.2f}km")
实战五:HyperLogLog 统计 UV
HyperLogLog 是一种概率数据结构,用极少内存(固定 12KB)估算集合基数(不重复元素数量),误差率约 0.81%,非常适合统计网页 UV(独立访客数)。
# 记录访问(无论记录多少次,内存始终 ~12KB)
PFADD page:home:uv user_1001 user_1002 user_1003
PFADD page:home:uv user_1001 # 重复用户,不增加计数
# 获取 UV 数量(近似值)
PFCOUNT page:home:uv
# 合并多个页面的 UV(去重)
PFMERGE site:total:uv page:home:uv page:about:uv page:contact:uv
PFCOUNT site:total:uv
from datetime import date
def track_pv_uv(page: str, user_id: str) -> dict:
"""同时记录 PV(精确)和 UV(近似)"""
today = date.today().isoformat()
pv_key = f"pv:{page}:{today}"
uv_key = f"uv:{page}:{today}"
with r.pipeline(transaction=False) as pipe:
pipe.incr(pv_key) # PV:精确计数
pipe.expire(pv_key, 86400 * 7)
pipe.pfadd(uv_key, user_id) # UV:HyperLogLog 近似
pipe.expire(uv_key, 86400 * 7)
pv, _, _, _ = pipe.execute()
uv = r.pfcount(uv_key)
return {'pv': pv, 'uv': uv, 'date': today}
# 模拟 100 个用户访问,其中 30 个重复访问
import random
for i in range(130):
user = f"user_{random.randint(1, 100)}" # 100个不同用户
stats = track_pv_uv("home", user)
print(f"PV: {stats['pv']}, UV: {stats['uv']}")
# PV: 130, UV: ~100(HyperLogLog 近似值)
综合实战:商品秒杀系统
综合运用前面所学的知识点,实现一个高并发商品秒杀系统的核心逻辑:
# 秒杀系统核心:原子性扣减库存 + 订单防重
SECKILL_SCRIPT = """
local stock_key = KEYS[1] -- 库存 key
local order_key = KEYS[2] -- 已购用户 Set
local user_id = ARGV[1]
local quantity = tonumber(ARGV[2])
-- 1. 检查是否重复购买
if redis.call('SISMEMBER', order_key, user_id) == 1 then
return -1 -- 已购买
end
-- 2. 检查库存
local stock = tonumber(redis.call('GET', stock_key))
if stock == nil or stock < quantity then
return 0 -- 库存不足
end
-- 3. 原子扣减库存
redis.call('DECRBY', stock_key, quantity)
-- 4. 记录已购买用户(防止重复购买)
redis.call('SADD', order_key, user_id)
return 1 -- 购买成功
"""
seckill = r.register_script(SECKILL_SCRIPT)
def seckill_product(product_id: str, user_id: str,
quantity: int = 1) -> str:
"""
秒杀购买
返回: "success" / "duplicate" / "out_of_stock"
"""
stock_key = f"seckill:stock:{product_id}"
order_key = f"seckill:orders:{product_id}"
result = seckill(
keys=[stock_key, order_key],
args=[user_id, quantity]
)
status_map = {1: "success", 0: "out_of_stock", -1: "duplicate"}
return status_map.get(result, "error")
# 初始化库存
r.set("seckill:stock:iphone15", 100)
# 模拟并发购买
import concurrent.futures
def buy(user_id): return seckill_product("iphone15", user_id)
with concurrent.futures.ThreadPoolExecutor(max_workers=50) as ex:
results = list(ex.map(buy, [f"user_{i}" for i in range(200)]))
success = results.count("success")
print(f"成功购买: {success} 个(应等于库存 100)")
print(f"剩余库存: {r.get('seckill:stock:iphone15')}") # 应为 0
本章知识点总结
| 场景 | Redis 方案 | 核心命令/结构 |
|---|---|---|
| API 限流 | 滑动窗口(ZSet + Lua) | ZADD/ZREMRANGEBYSCORE/ZCARD |
| 用户登录 Session | JWT + Redis Hash | HSET/HGET/EXPIRE/DEL |
| 实时排行榜 | ZSet 多周期 | ZINCRBY/ZRANGE REV/ZREVRANK |
| 附近地点查询 | GEO(基于 ZSet) | GEOADD/GEOSEARCH/GEODIST |
| UV 统计 | HyperLogLog | PFADD/PFCOUNT/PFMERGE |
| 秒杀防超卖 | Lua 脚本原子操作 | register_script/DECRBY/SISMEMBER |
恭喜完成 Redis 实战教程! 你已经掌握了 Redis 从核心数据结构到集群部署的完整知识链。实际项目中,Redis 的价值在于用合适的数据结构解决合适的问题——记住:用最简单的方案解决问题,而非将所有场景都塞进 Redis。