Chapter 10

实战:限流 + Session + 排行榜

滑动窗口限流、JWT Session 管理、ZSet 实时榜单、GEO 位置查询、HyperLogLog UV 统计

实战一:滑动窗口限流

限流是保护后端服务的关键手段。固定窗口限流(如每分钟 100 次)存在窗口边界突刺问题——第 59 秒打 100 次、第 61 秒再打 100 次,实际上 2 秒内发送了 200 次请求。滑动窗口以当前时间为右边界,精确控制任意时间窗口内的请求数。

滑动窗口限流(ZSet 实现) 窗口大小: 60秒,限制: 100次 时间轴: ────────────────────────────────────── [旧请求] [过期] [12:00:00 ~ 12:01:00 窗口] [now] ↑移除过期 ↑加入新请求 ZSet 中存储: member=请求ID(UUID), score=时间戳(ms) 每次请求: 1. ZREMRANGEBYSCORE 删除 (now-window) 之前的过期记录 2. ZCARD 统计当前窗口内的请求数 3. 若 < 限制:ZADD 加入本次请求,放行 4. 若 >= 限制:拒绝,返回 429
import redis, uuid, time
from typing import Tuple

r = redis.Redis(host='localhost', decode_responses=True)

# Lua 脚本保证原子性(检查+写入一步完成)
SLIDING_WINDOW_SCRIPT = """
local key = KEYS[1]
local now = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local limit = tonumber(ARGV[3])
local req_id = ARGV[4]

-- 删除过期的请求记录
redis.call('ZREMRANGEBYSCORE', key, 0, now - window)

-- 当前窗口内的请求数
local count = redis.call('ZCARD', key)

if count < limit then
    -- 未超限:记录本次请求
    redis.call('ZADD', key, now, req_id)
    redis.call('EXPIRE', key, math.ceil(window/1000) + 1)
    return {1, count + 1}  -- {允许, 当前计数}
else
    return {0, count}      -- {拒绝, 当前计数}
end
"""

sliding_window = r.register_script(SLIDING_WINDOW_SCRIPT)

def rate_limit(identifier: str,
               limit: int = 100,
               window_ms: int = 60000) -> Tuple[bool, int]:
    """
    滑动窗口限流
    :param identifier: 限流标识(用户ID/IP/接口)
    :param limit: 窗口内最大请求数
    :param window_ms: 窗口大小(毫秒)
    :return: (是否放行, 当前窗口内请求数)
    """
    key = f"ratelimit:{identifier}"
    now_ms = int(time.time() * 1000)
    req_id = f"{now_ms}-{uuid.uuid4().hex[:8]}"

    result = sliding_window(
        keys=[key],
        args=[now_ms, window_ms, limit, req_id]
    )
    return result[0] == 1, result[1]

# FastAPI 中间件示例
# from fastapi import Request, HTTPException
# @app.middleware("http")
# async def rate_limit_middleware(request: Request, call_next):
#     client_ip = request.client.host
#     allowed, count = rate_limit(f"ip:{client_ip}", limit=60, window_ms=60000)
#     if not allowed:
#         raise HTTPException(status_code=429, detail="Too Many Requests")
#     response = await call_next(request)
#     response.headers["X-RateLimit-Count"] = str(count)
#     return response

# 测试
for i in range(5):
    allowed, count = rate_limit("user:1001", limit=3, window_ms=10000)
    print(f"请求{i+1}: 放行={allowed}, 计数={count}")

实战二:JWT + Redis Session 管理

纯 JWT 的问题:Token 无法主动失效(如用户退出登录)。结合 Redis,可以实现强制下线、多端登录控制、Token 刷新等功能。

import redis, jwt, uuid, time, hashlib
from datetime import datetime, timedelta
from typing import Optional

r = redis.Redis(host='localhost', decode_responses=True)
SECRET = "your-secret-key"
ACCESS_TTL = 3600    # Access Token 有效期 1小时
REFRESH_TTL = 604800  # Refresh Token 有效期 7天

def create_session(user_id: int, device: str = "web") -> dict:
    """登录:创建 Access Token + Refresh Token"""
    jti = uuid.uuid4().hex  # JWT ID,唯一标识符
    now = int(time.time())

    # Access Token(短期,携带用户信息)
    access_payload = {
        'uid': user_id, 'jti': jti,
        'device': device, 'iat': now,
        'exp': now + ACCESS_TTL
    }
    access_token = jwt.encode(access_payload, SECRET, algorithm='HS256')

    # Refresh Token(长期,用于刷新 Access Token)
    refresh_token = hashlib.sha256(uuid.uuid4().bytes).hexdigest()

    # Redis 存储:记录有效的 Session
    session_key = f"session:{user_id}:{device}"
    refresh_key = f"refresh:{refresh_token}"

    r.hset(session_key, mapping={
        'jti': jti, 'refresh': refresh_token,
        'created_at': now, 'device': device
    })
    r.expire(session_key, REFRESH_TTL)
    r.setex(refresh_key, REFRESH_TTL, user_id)

    return {'access_token': access_token, 'refresh_token': refresh_token}

def verify_token(access_token: str) -> Optional[dict]:
    """验证 Access Token 是否有效"""
    try:
        payload = jwt.decode(access_token, SECRET, algorithms=['HS256'])
        # 检查 Session 是否已被主动撤销
        session_key = f"session:{payload['uid']}:{payload['device']}"
        stored_jti = r.hget(session_key, 'jti')
        if stored_jti != payload['jti']:
            return None  # Token 已被刷新或登出
        return payload
    except jwt.ExpiredSignatureError:
        return None
    except jwt.InvalidTokenError:
        return None

def logout(user_id: int, device: str = "web") -> None:
    """登出:删除 Redis Session(Token 立即失效)"""
    session_key = f"session:{user_id}:{device}"
    refresh = r.hget(session_key, 'refresh')
    if refresh:
        r.delete(f"refresh:{refresh}")
    r.delete(session_key)

实战三:ZSet 实时排行榜

import redis
from datetime import datetime

r = redis.Redis(host='localhost', decode_responses=True)

def leaderboard_key(period: str = 'daily') -> str:
    """按周期生成排行榜 key(日榜/周榜/月榜)"""
    today = datetime.now().strftime('%Y%m%d')
    week = datetime.now().strftime('%Y-W%W')
    month = datetime.now().strftime('%Y%m')
    keys = {'daily': today, 'weekly': week, 'monthly': month}
    return f"rank:{period}:{keys[period]}"

def add_score(user_id: str, score_delta: float,
               periods: list = ['daily', 'weekly', 'monthly']) -> dict:
    """加分:同时更新多个周期的排行榜"""
    with r.pipeline(transaction=False) as pipe:
        for period in periods:
            key = leaderboard_key(period)
            pipe.zincrby(key, score_delta, user_id)
        results = pipe.execute()
    return {p: r for p, r in zip(periods, results)}

def get_leaderboard(period: str = 'daily', top_n: int = 10) -> list:
    """获取 Top N 排行"""
    key = leaderboard_key(period)
    entries = r.zrange(key, 0, top_n - 1, rev=True, withscores=True)
    return [
        {'rank': i + 1, 'user_id': uid, 'score': score}
        for i, (uid, score) in enumerate(entries)
    ]

# 测试
add_score("alice", 100)
add_score("bob", 200)
add_score("alice", 150)  # alice 累计 250
print(get_leaderboard('daily'))
# [{'rank': 1, 'user_id': 'alice', 'score': 250.0},
#  {'rank': 2, 'user_id': 'bob', 'score': 200.0}]

实战四:GEO 地理位置查询

Redis GEO 基于 ZSet 实现,将经纬度编码为 geohash 存入 score,支持距离计算和范围查询(如"附近的门店")。

# 添加商家位置(经度, 纬度, 名称)
GEOADD stores 121.4737 31.2304 "人民广场店"
GEOADD stores 121.5033 31.2380 "外滩店"
GEOADD stores 121.4437 31.1782 "徐汇店"

# 计算两点距离(km)
GEODIST stores "人民广场店" "外滩店" km

# 搜索附近5km内的门店
GEOSEARCH stores FROMMEMBER "人民广场店" BYRADIUS 5 km ASC WITHCOORD WITHDIST COUNT 10
def find_nearby_stores(lng: float, lat: float,
                         radius_km: float = 5.0,
                         max_count: int = 10) -> list:
    """查找附近门店"""
    results = r.geosearch(
        'stores',
        longitude=lng, latitude=lat,
        radius=radius_km, unit='km',
        sort='ASC',
        count=max_count,
        withcoord=True,
        withdist=True
    )
    return [
        {'name': name, 'distance_km': dist, 'location': coord}
        for name, dist, coord in results
    ]

stores = find_nearby_stores(121.473, 31.230)
for store in stores:
    print(f"{store['name']} - {store['distance_km']:.2f}km")

实战五:HyperLogLog 统计 UV

HyperLogLog 是一种概率数据结构,用极少内存(固定 12KB)估算集合基数(不重复元素数量),误差率约 0.81%,非常适合统计网页 UV(独立访客数)。

# 记录访问(无论记录多少次,内存始终 ~12KB)
PFADD page:home:uv user_1001 user_1002 user_1003
PFADD page:home:uv user_1001  # 重复用户,不增加计数

# 获取 UV 数量(近似值)
PFCOUNT page:home:uv
# 合并多个页面的 UV(去重)
PFMERGE site:total:uv page:home:uv page:about:uv page:contact:uv
PFCOUNT site:total:uv
from datetime import date

def track_pv_uv(page: str, user_id: str) -> dict:
    """同时记录 PV(精确)和 UV(近似)"""
    today = date.today().isoformat()
    pv_key = f"pv:{page}:{today}"
    uv_key = f"uv:{page}:{today}"

    with r.pipeline(transaction=False) as pipe:
        pipe.incr(pv_key)          # PV:精确计数
        pipe.expire(pv_key, 86400 * 7)
        pipe.pfadd(uv_key, user_id)  # UV:HyperLogLog 近似
        pipe.expire(uv_key, 86400 * 7)
        pv, _, _, _ = pipe.execute()

    uv = r.pfcount(uv_key)
    return {'pv': pv, 'uv': uv, 'date': today}

# 模拟 100 个用户访问,其中 30 个重复访问
import random
for i in range(130):
    user = f"user_{random.randint(1, 100)}"  # 100个不同用户
    stats = track_pv_uv("home", user)
print(f"PV: {stats['pv']}, UV: {stats['uv']}")
# PV: 130, UV: ~100(HyperLogLog 近似值)

综合实战:商品秒杀系统

综合运用前面所学的知识点,实现一个高并发商品秒杀系统的核心逻辑:

# 秒杀系统核心:原子性扣减库存 + 订单防重
SECKILL_SCRIPT = """
local stock_key = KEYS[1]    -- 库存 key
local order_key = KEYS[2]    -- 已购用户 Set
local user_id = ARGV[1]
local quantity = tonumber(ARGV[2])

-- 1. 检查是否重复购买
if redis.call('SISMEMBER', order_key, user_id) == 1 then
    return -1  -- 已购买
end

-- 2. 检查库存
local stock = tonumber(redis.call('GET', stock_key))
if stock == nil or stock < quantity then
    return 0  -- 库存不足
end

-- 3. 原子扣减库存
redis.call('DECRBY', stock_key, quantity)

-- 4. 记录已购买用户(防止重复购买)
redis.call('SADD', order_key, user_id)

return 1  -- 购买成功
"""

seckill = r.register_script(SECKILL_SCRIPT)

def seckill_product(product_id: str, user_id: str,
                      quantity: int = 1) -> str:
    """
    秒杀购买
    返回: "success" / "duplicate" / "out_of_stock"
    """
    stock_key = f"seckill:stock:{product_id}"
    order_key = f"seckill:orders:{product_id}"

    result = seckill(
        keys=[stock_key, order_key],
        args=[user_id, quantity]
    )
    status_map = {1: "success", 0: "out_of_stock", -1: "duplicate"}
    return status_map.get(result, "error")

# 初始化库存
r.set("seckill:stock:iphone15", 100)

# 模拟并发购买
import concurrent.futures
def buy(user_id): return seckill_product("iphone15", user_id)

with concurrent.futures.ThreadPoolExecutor(max_workers=50) as ex:
    results = list(ex.map(buy, [f"user_{i}" for i in range(200)]))

success = results.count("success")
print(f"成功购买: {success} 个(应等于库存 100)")
print(f"剩余库存: {r.get('seckill:stock:iphone15')}")  # 应为 0

本章知识点总结

场景Redis 方案核心命令/结构
API 限流滑动窗口(ZSet + Lua)ZADD/ZREMRANGEBYSCORE/ZCARD
用户登录 SessionJWT + Redis HashHSET/HGET/EXPIRE/DEL
实时排行榜ZSet 多周期ZINCRBY/ZRANGE REV/ZREVRANK
附近地点查询GEO(基于 ZSet)GEOADD/GEOSEARCH/GEODIST
UV 统计HyperLogLogPFADD/PFCOUNT/PFMERGE
秒杀防超卖Lua 脚本原子操作register_script/DECRBY/SISMEMBER

恭喜完成 Redis 实战教程! 你已经掌握了 Redis 从核心数据结构到集群部署的完整知识链。实际项目中,Redis 的价值在于用合适的数据结构解决合适的问题——记住:用最简单的方案解决问题,而非将所有场景都塞进 Redis。