List:双向链表与消息队列
Redis 的 List 是一个有序的字符串列表,支持从两端快速插入和弹出,天然适合队列(FIFO)和栈(LIFO)模式。
底层结构:quicklist
quicklist 兼顾了 linkedlist 的快速插删和 listpack 的内存紧凑性:两端操作 O(1),中间查找 O(N)(不常用)。
List 命令速查表
| 命令 | 语法 | 说明 | 复杂度 |
|---|---|---|---|
LPUSH | LPUSH key val [val ...] | 从左端插入(头部),返回列表长度 | O(1) |
RPUSH | RPUSH key val [val ...] | 从右端插入(尾部) | O(1) |
LPOP | LPOP key [count] | 从左端弹出 | O(N) |
RPOP | RPOP key [count] | 从右端弹出 | O(N) |
BLPOP | BLPOP key [key ...] timeout | 阻塞式左端弹出,队列为空时等待 | O(1) |
BRPOP | BRPOP key [key ...] timeout | 阻塞式右端弹出 | O(1) |
LRANGE | LRANGE key start stop | 获取范围内元素,-1 表示最后一个 | O(S+N) |
LLEN | LLEN key | 列表长度 | O(1) |
LINDEX | LINDEX key index | 获取指定索引的元素 | O(N) |
LINSERT | LINSERT key BEFORE|AFTER pivot val | 在某元素前/后插入 | O(N) |
LREM | LREM key count val | 删除 count 个值为 val 的元素 | O(N+M) |
LMOVE | LMOVE src dst LEFT|RIGHT LEFT|RIGHT | 原子移动元素到另一个列表(7.x) | O(1) |
消息队列模式(Python 实现)
import redis, json, time, threading
r = redis.Redis(host='localhost', decode_responses=True)
# 生产者:向队列推送任务
def producer(queue_name: str, task: dict) -> None:
payload = json.dumps(task, ensure_ascii=False)
r.rpush(queue_name, payload)
print(f"[Producer] 推送任务: {task}")
# 消费者:阻塞式消费(推荐,避免轮询浪费 CPU)
def consumer(queue_name: str) -> None:
print(f"[Consumer] 开始监听队列: {queue_name}")
while True:
# BLPOP:队列为空时阻塞等待,超时时间 0 表示永久等待
result = r.blpop(queue_name, timeout=5)
if result:
_, payload = result
task = json.loads(payload)
print(f"[Consumer] 处理任务: {task}")
process_task(task) # 实际处理逻辑
def process_task(task: dict) -> None:
time.sleep(0.1) # 模拟处理
print(f"[Worker] 完成: {task['id']}")
# 启动生产者和消费者
t = threading.Thread(target=consumer, args=('task:queue',), daemon=True)
t.start()
for i in range(5):
producer('task:queue', {'id': i, 'type': 'email', 'to': f'user{i}@test.com'})
List 消息队列的致命缺陷:消费者 BLPOP 弹出消息后如果进程崩溃,消息会丢失!List 没有 ACK 机制。生产环境应使用 Redis Stream(第6章)或专业消息队列(Kafka/RabbitMQ)。若必须用 List,可将弹出的消息同时备份到另一个"处理中"列表,处理完成后再删除。
Set:无序集合与集合运算
Set 是无重复元素的无序集合,支持高效的集合运算(交集、并集、差集),适合标签系统、好友关系、抽奖等场景。
Set 命令速查表
| 命令 | 语法 | 说明 |
|---|---|---|
SADD | SADD key member [member ...] | 添加元素,返回新增数量 |
SREM | SREM key member [member ...] | 删除元素 |
SMEMBERS | SMEMBERS key | 获取所有元素(大集合慎用) |
SISMEMBER | SISMEMBER key member | 判断元素是否存在 |
SMISMEMBER | SMISMEMBER key m1 m2 ... | 批量判断(7.x) |
SCARD | SCARD key | 集合元素数量 |
SPOP | SPOP key [count] | 随机弹出元素(抽奖) |
SRANDMEMBER | SRANDMEMBER key [count] | 随机获取元素(不删除) |
SUNION | SUNION key [key ...] | 并集 |
SINTER | SINTER key [key ...] | 交集(共同好友) |
SDIFF | SDIFF key [key ...] | 差集(我有你没有) |
SUNIONSTORE | SUNIONSTORE dest key [key ...] | 并集结果存入新 key |
SINTERSTORE | SINTERSTORE dest key [key ...] | 交集结果存入新 key |
SINTERCARD | SINTERCARD numkeys key ... [LIMIT n] | 返回交集基数(7.x,不返回元素) |
# 社交网络:共同好友计算
SADD friends:alice bob charlie dave eve
SADD friends:bob alice charlie frank
# 共同好友(交集)
SINTER friends:alice friends:bob
# 返回: {"charlie"}
# 推荐好友(bob 的好友中 alice 还没认识的)
SDIFF friends:bob friends:alice
# 返回: {"frank"}(排除了alice自己)
# 快速判断是否共同关注(交集基数,无需传输所有元素)
SINTERCARD 2 friends:alice friends:bob LIMIT 10
# 返回: 1
ZSet:有序集合与跳表
ZSet(Sorted Set)是 Redis 最独特的数据结构。每个元素有一个浮点数 score(分值),元素按 score 从小到大自动排序,同时支持按 score 范围查询和按 member 查询 score,复杂度均为 O(log N)。
底层实现:跳表(Skip List)
为何选择跳表而非红黑树?
- 范围查询效率:跳表底层是链表,
ZRANGEBYSCORE定位起点后直接顺序遍历,红黑树需要中序遍历 - 实现简单:跳表代码量约为红黑树的 1/3,更易维护和调试
- 内存局部性:跳表节点在插入顺序上连续,缓存命中率更高
ZSet 命令速查表
| 命令 | 语法 | 说明 |
|---|---|---|
ZADD | ZADD key [NX|XX] [GT|LT] [CH] score member ... | 添加/更新元素,GT/LT 仅在新分值更大/小时更新 |
ZSCORE | ZSCORE key member | 获取元素分值 |
ZINCRBY | ZINCRBY key delta member | 原子增加分值 |
ZRANK | ZRANK key member [WITHSCORE] | 获取排名(升序,从0开始) |
ZREVRANK | ZREVRANK key member [WITHSCORE] | 获取排名(降序) |
ZRANGE | ZRANGE key min max [BYSCORE|BYLEX] [REV] [LIMIT] [WITHSCORES] | 7.x 统一范围查询命令 |
ZRANGEBYSCORE | ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT] | 按分值范围查询(被 ZRANGE BYSCORE 替代) |
ZRANGEBYLEX | ZRANGEBYLEX key min max | 按字典序范围查询(分值相同时) |
ZCARD | ZCARD key | 元素总数 |
ZCOUNT | ZCOUNT key min max | 分值范围内元素数量 |
ZPOPMIN | ZPOPMIN key [count] | 弹出分值最小的元素 |
ZPOPMAX | ZPOPMAX key [count] | 弹出分值最大的元素 |
ZREM | ZREM key member [member ...] | 删除元素 |
ZREMRANGEBYSCORE | ZREMRANGEBYSCORE key min max | 删除分值范围内的元素 |
排行榜实现(完整代码)
import redis
from typing import List, Tuple
r = redis.Redis(host='localhost', decode_responses=True)
LEADERBOARD_KEY = "game:leaderboard"
def submit_score(player: str, score: float) -> int:
"""提交分数,GT 选项确保只有更高分才会更新"""
r.zadd(LEADERBOARD_KEY, {player: score}, gt=True)
# 获取当前排名(降序,+1 变成从1开始)
rank = r.zrevrank(LEADERBOARD_KEY, player)
return rank + 1 if rank is not None else -1
def get_top_n(n: int = 10) -> List[Tuple[str, float]]:
"""获取 Top N 排行榜"""
# ZRANGE ... REV WITHSCORES:降序带分值(Redis 7.x 新语法)
result = r.zrange(LEADERBOARD_KEY, 0, n - 1,
rev=True, withscores=True)
return [(player, score) for player, score in result]
def get_player_rank(player: str) -> dict:
"""获取玩家的排名和分值"""
with r.pipeline() as pipe:
pipe.zrevrank(LEADERBOARD_KEY, player)
pipe.zscore(LEADERBOARD_KEY, player)
rank, score = pipe.execute()
return {
'player': player,
'rank': (rank + 1) if rank is not None else None,
'score': score
}
def get_nearby_players(player: str, radius: int = 2) -> List[Tuple]:
"""获取玩家附近排名的玩家(如排名±2名)"""
rank = r.zrevrank(LEADERBOARD_KEY, player)
if rank is None: return []
start = max(0, rank - radius)
end = rank + radius
return r.zrange(LEADERBOARD_KEY, start, end, rev=True, withscores=True)
# 测试
submit_score("alice", 9500)
submit_score("bob", 8800)
submit_score("charlie", 9200)
submit_score("dave", 7600)
print("Top 3:", get_top_n(3))
# [('alice', 9500.0), ('charlie', 9200.0), ('bob', 8800.0)]
print(get_player_rank("charlie"))
# {'player': 'charlie', 'rank': 2, 'score': 9200.0}
ZSet 分值精度:ZSet 的 score 是 64 位浮点数(IEEE 754 double)。当分值超过 2^53 时,整数精度会丢失。对于需要精确大整数的场景(如时间戳毫秒),应注意这个限制,或将时间戳缩放(如除以 1000 用秒级)。
延迟队列:ZSet 实现定时任务
ZSet 的 score 存储执行时间戳,实现"延迟队列"——任务在指定时间后才能被消费,常用于订单超时、消息重试等场景:
import redis, time, json
from typing import Optional
r = redis.Redis(host='localhost', decode_responses=True)
DELAY_QUEUE = "delay:queue"
def push_delayed(task: dict, delay_seconds: float) -> None:
"""将任务放入延迟队列,delay_seconds 秒后可被消费"""
execute_at = time.time() + delay_seconds # 执行时间戳作为 score
payload = json.dumps(task)
r.zadd(DELAY_QUEUE, {payload: execute_at})
def pop_ready() -> Optional[dict]:
"""取出到期的任务(score <= 当前时间戳)"""
now = time.time()
# ZRANGEBYSCORE 获取 score 在 [0, now] 范围内的任务
tasks = r.zrangebyscore(DELAY_QUEUE, 0, now, start=0, num=1)
if not tasks:
return None
# 使用 pipeline + ZREM 实现原子"取出并删除"
payload = tasks[0]
removed = r.zrem(DELAY_QUEUE, payload)
return json.loads(payload) if removed else None
# 使用示例:订单 30 分钟超时检查
push_delayed({'type': 'order_expire', 'order_id': '1001'}, delay_seconds=1800)
push_delayed({'type': 'retry_email', 'to': 'user@example.com'}, delay_seconds=300)
# 消费循环(每秒检查一次)
while True:
task = pop_ready()
if task:
print(f"执行任务: {task}")
time.sleep(1)
延迟队列的竞争问题:上面的 pop_ready() 实现存在竞态条件:多个消费者同时调用时可能处理同一个任务。生产环境应使用 Lua 脚本将 ZRANGEBYSCORE + ZREM 合并为一个原子操作,或使用 Redis Stream 的消费者组来保证每条消息只被一个消费者处理。
HyperLogLog:极低内存的基数统计
HyperLogLog 是一种概率算法,用约 12 KB 内存估算任意数量不重复元素的基数,误差率约 0.81%。适合不需要精确值的大规模 UV(独立用户)统计:
# 每次用户访问,将 user_id 加入当日 HyperLogLog
PFADD uv:2024-12-01 user:1001 user:1002 user:1003
PFADD uv:2024-12-01 user:1001 user:2000 # user:1001 重复,不计入
# 估算当日 UV
PFCOUNT uv:2024-12-01 # 约 4(误差 <1%)
# 合并多天数据,统计周 UV
PFMERGE uv:2024-week-49 uv:2024-12-01 uv:2024-12-02 uv:2024-12-07
PFCOUNT uv:2024-week-49
GEOADD 将经纬度编码为 52 位 GeoHash 存入 score,支持 GEODIST(距离计算)、GEOSEARCH(范围搜索)。适合附近的人、就近配送等场景。List、Set、ZSet 各有独特的底层结构和适用场景:
List:quicklist(双链表 + listpack 节点),O(1) 两端操作。适合消息队列(BLPOP)、时间线(最新N条),不适合随机访问。
Set:listpack(小集合)或 hashtable(大集合),O(1) 增删查,O(N) 集合运算。适合标签、好友关系、抽奖去重。
ZSet:listpack(小有序集合)或 skiplist + hashtable(大有序集合),O(log N) 操作。适合排行榜、延迟队列、范围查询。
选型口诀:需要排序用 ZSet,需要集合运算用 Set,需要队列语义用 List,需要简单存取用 String/Hash。