List:双向链表与消息队列
Redis 的 List 是一个有序的字符串列表,支持从两端快速插入和弹出,天然适合队列(FIFO)和栈(LIFO)模式。
底层结构:quicklist
quicklist = 双向链表 + listpack 节点
head ←───────────────────────────────────────── tail
↓ ↓
┌────────────┐ ┌────────────┐ ┌────────────┐
│ quicknode │←──▶│ quicknode │←──▶│ quicknode │
│ ┌──────┐ │ │ ┌──────┐ │ │ ┌──────┐ │
│ │lpack │ │ │ │lpack │ │ │ │lpack │ │
│ │v1 v2 │ │ │ │v3 v4 │ │ │ │v5 v6 │ │
│ └──────┘ │ │ └──────┘ │ │ └──────┘ │
└────────────┘ └────────────┘ └────────────┘
每个节点最多存 128 个元素(list-max-listpack-size)
quicklist 兼顾了 linkedlist 的快速插删和 listpack 的内存紧凑性:两端操作 O(1),中间查找 O(N)(不常用)。
List 命令速查表
| 命令 | 语法 | 说明 | 复杂度 |
|---|---|---|---|
LPUSH | LPUSH key val [val ...] | 从左端插入(头部),返回列表长度 | O(1) |
RPUSH | RPUSH key val [val ...] | 从右端插入(尾部) | O(1) |
LPOP | LPOP key [count] | 从左端弹出 | O(N) |
RPOP | RPOP key [count] | 从右端弹出 | O(N) |
BLPOP | BLPOP key [key ...] timeout | 阻塞式左端弹出,队列为空时等待 | O(1) |
BRPOP | BRPOP key [key ...] timeout | 阻塞式右端弹出 | O(1) |
LRANGE | LRANGE key start stop | 获取范围内元素,-1 表示最后一个 | O(S+N) |
LLEN | LLEN key | 列表长度 | O(1) |
LINDEX | LINDEX key index | 获取指定索引的元素 | O(N) |
LINSERT | LINSERT key BEFORE|AFTER pivot val | 在某元素前/后插入 | O(N) |
LREM | LREM key count val | 删除 count 个值为 val 的元素 | O(N+M) |
LMOVE | LMOVE src dst LEFT|RIGHT LEFT|RIGHT | 原子移动元素到另一个列表(7.x) | O(1) |
消息队列模式(Python 实现)
import redis, json, time, threading
r = redis.Redis(host='localhost', decode_responses=True)
# 生产者:向队列推送任务
def producer(queue_name: str, task: dict) -> None:
payload = json.dumps(task, ensure_ascii=False)
r.rpush(queue_name, payload)
print(f"[Producer] 推送任务: {task}")
# 消费者:阻塞式消费(推荐,避免轮询浪费 CPU)
def consumer(queue_name: str) -> None:
print(f"[Consumer] 开始监听队列: {queue_name}")
while True:
# BLPOP:队列为空时阻塞等待,超时时间 0 表示永久等待
result = r.blpop(queue_name, timeout=5)
if result:
_, payload = result
task = json.loads(payload)
print(f"[Consumer] 处理任务: {task}")
process_task(task) # 实际处理逻辑
def process_task(task: dict) -> None:
time.sleep(0.1) # 模拟处理
print(f"[Worker] 完成: {task['id']}")
# 启动生产者和消费者
t = threading.Thread(target=consumer, args=('task:queue',), daemon=True)
t.start()
for i in range(5):
producer('task:queue', {'id': i, 'type': 'email', 'to': f'user{i}@test.com'})
List 消息队列的致命缺陷:消费者 BLPOP 弹出消息后如果进程崩溃,消息会丢失!List 没有 ACK 机制。生产环境应使用 Redis Stream(第6章)或专业消息队列(Kafka/RabbitMQ)。若必须用 List,可将弹出的消息同时备份到另一个"处理中"列表,处理完成后再删除。
Set:无序集合与集合运算
Set 是无重复元素的无序集合,支持高效的集合运算(交集、并集、差集),适合标签系统、好友关系、抽奖等场景。
Set 命令速查表
| 命令 | 语法 | 说明 |
|---|---|---|
SADD | SADD key member [member ...] | 添加元素,返回新增数量 |
SREM | SREM key member [member ...] | 删除元素 |
SMEMBERS | SMEMBERS key | 获取所有元素(大集合慎用) |
SISMEMBER | SISMEMBER key member | 判断元素是否存在 |
SMISMEMBER | SMISMEMBER key m1 m2 ... | 批量判断(7.x) |
SCARD | SCARD key | 集合元素数量 |
SPOP | SPOP key [count] | 随机弹出元素(抽奖) |
SRANDMEMBER | SRANDMEMBER key [count] | 随机获取元素(不删除) |
SUNION | SUNION key [key ...] | 并集 |
SINTER | SINTER key [key ...] | 交集(共同好友) |
SDIFF | SDIFF key [key ...] | 差集(我有你没有) |
SUNIONSTORE | SUNIONSTORE dest key [key ...] | 并集结果存入新 key |
SINTERSTORE | SINTERSTORE dest key [key ...] | 交集结果存入新 key |
SINTERCARD | SINTERCARD numkeys key ... [LIMIT n] | 返回交集基数(7.x,不返回元素) |
# 社交网络:共同好友计算
SADD friends:alice bob charlie dave eve
SADD friends:bob alice charlie frank
# 共同好友(交集)
SINTER friends:alice friends:bob
# 返回: {"charlie"}
# 推荐好友(bob 的好友中 alice 还没认识的)
SDIFF friends:bob friends:alice
# 返回: {"frank"}(排除了alice自己)
# 快速判断是否共同关注(交集基数,无需传输所有元素)
SINTERCARD 2 friends:alice friends:bob LIMIT 10
# 返回: 1
ZSet:有序集合与跳表
ZSet(Sorted Set)是 Redis 最独特的数据结构。每个元素有一个浮点数 score(分值),元素按 score 从小到大自动排序,同时支持按 score 范围查询和按 member 查询 score,复杂度均为 O(log N)。
底层实现:跳表(Skip List)
跳表结构(ZSet 大于 128 个元素时使用)
Level 3: header ─────────────────────────────────▶ alice(90) ──▶ NULL
Level 2: header ─────────────▶ bob(70) ────────────▶ alice(90) ──▶ NULL
Level 1: header ──▶ eve(50) ──▶ bob(70) ──▶ dave(80) ──▶ alice(90) ──▶ NULL
Level 0: header ──▶ eve(50) ──▶ bob(70) ──▶ dave(80) ──▶ alice(90) ──▶ NULL
查找 dave(80):从 Level 3 开始,不满足则下降,平均时间 O(log N)
相比红黑树:实现更简单,范围查询更高效(链表遍历 vs 中序遍历)
为何选择跳表而非红黑树?
- 范围查询效率:跳表底层是链表,
ZRANGEBYSCORE定位起点后直接顺序遍历,红黑树需要中序遍历 - 实现简单:跳表代码量约为红黑树的 1/3,更易维护和调试
- 内存局部性:跳表节点在插入顺序上连续,缓存命中率更高
ZSet 命令速查表
| 命令 | 语法 | 说明 |
|---|---|---|
ZADD | ZADD key [NX|XX] [GT|LT] [CH] score member ... | 添加/更新元素,GT/LT 仅在新分值更大/小时更新 |
ZSCORE | ZSCORE key member | 获取元素分值 |
ZINCRBY | ZINCRBY key delta member | 原子增加分值 |
ZRANK | ZRANK key member [WITHSCORE] | 获取排名(升序,从0开始) |
ZREVRANK | ZREVRANK key member [WITHSCORE] | 获取排名(降序) |
ZRANGE | ZRANGE key min max [BYSCORE|BYLEX] [REV] [LIMIT] [WITHSCORES] | 7.x 统一范围查询命令 |
ZRANGEBYSCORE | ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT] | 按分值范围查询(被 ZRANGE BYSCORE 替代) |
ZRANGEBYLEX | ZRANGEBYLEX key min max | 按字典序范围查询(分值相同时) |
ZCARD | ZCARD key | 元素总数 |
ZCOUNT | ZCOUNT key min max | 分值范围内元素数量 |
ZPOPMIN | ZPOPMIN key [count] | 弹出分值最小的元素 |
ZPOPMAX | ZPOPMAX key [count] | 弹出分值最大的元素 |
ZREM | ZREM key member [member ...] | 删除元素 |
ZREMRANGEBYSCORE | ZREMRANGEBYSCORE key min max | 删除分值范围内的元素 |
排行榜实现(完整代码)
import redis
from typing import List, Tuple
r = redis.Redis(host='localhost', decode_responses=True)
LEADERBOARD_KEY = "game:leaderboard"
def submit_score(player: str, score: float) -> int:
"""提交分数,GT 选项确保只有更高分才会更新"""
r.zadd(LEADERBOARD_KEY, {player: score}, gt=True)
# 获取当前排名(降序,+1 变成从1开始)
rank = r.zrevrank(LEADERBOARD_KEY, player)
return rank + 1 if rank is not None else -1
def get_top_n(n: int = 10) -> List[Tuple[str, float]]:
"""获取 Top N 排行榜"""
# ZRANGE ... REV WITHSCORES:降序带分值(Redis 7.x 新语法)
result = r.zrange(LEADERBOARD_KEY, 0, n - 1,
rev=True, withscores=True)
return [(player, score) for player, score in result]
def get_player_rank(player: str) -> dict:
"""获取玩家的排名和分值"""
with r.pipeline() as pipe:
pipe.zrevrank(LEADERBOARD_KEY, player)
pipe.zscore(LEADERBOARD_KEY, player)
rank, score = pipe.execute()
return {
'player': player,
'rank': (rank + 1) if rank is not None else None,
'score': score
}
def get_nearby_players(player: str, radius: int = 2) -> List[Tuple]:
"""获取玩家附近排名的玩家(如排名±2名)"""
rank = r.zrevrank(LEADERBOARD_KEY, player)
if rank is None: return []
start = max(0, rank - radius)
end = rank + radius
return r.zrange(LEADERBOARD_KEY, start, end, rev=True, withscores=True)
# 测试
submit_score("alice", 9500)
submit_score("bob", 8800)
submit_score("charlie", 9200)
submit_score("dave", 7600)
print("Top 3:", get_top_n(3))
# [('alice', 9500.0), ('charlie', 9200.0), ('bob', 8800.0)]
print(get_player_rank("charlie"))
# {'player': 'charlie', 'rank': 2, 'score': 9200.0}
ZSet 分值精度:ZSet 的 score 是 64 位浮点数(IEEE 754 double)。当分值超过 2^53 时,整数精度会丢失。对于需要精确大整数的场景(如时间戳毫秒),应注意这个限制,或将时间戳缩放(如除以 1000 用秒级)。