发布订阅(Pub/Sub)
Pub/Sub 是 Redis 内置的消息广播机制。发布者(Publisher)向频道(Channel)发送消息,所有订阅该频道的订阅者(Subscriber)同时收到消息,实现解耦通信。
Pub/Sub 命令速查表
| 命令 | 语法 | 说明 |
|---|---|---|
SUBSCRIBE | SUBSCRIBE channel [channel ...] | 订阅指定频道 |
UNSUBSCRIBE | UNSUBSCRIBE [channel ...] | 取消订阅 |
PUBLISH | PUBLISH channel message | 向频道发布消息,返回接收者数量 |
PSUBSCRIBE | PSUBSCRIBE pattern [pattern ...] | 按 glob 模式订阅(如 news.*) |
PUNSUBSCRIBE | PUNSUBSCRIBE [pattern ...] | 取消模式订阅 |
PUBSUB CHANNELS | PUBSUB CHANNELS [pattern] | 列出活跃频道 |
PUBSUB NUMSUB | PUBSUB NUMSUB [channel ...] | 各频道的订阅者数量 |
SSUBSCRIBE | SSUBSCRIBE shardchannel | 分片订阅(Redis 7.x Cluster 优化) |
Pub/Sub Python 示例
import redis, threading, time
r = redis.Redis(host='localhost', decode_responses=True)
# 订阅者(在独立线程中运行)
def subscriber(channels: list):
pubsub = r.pubsub()
pubsub.subscribe(*channels)
print(f"订阅频道: {channels}")
for message in pubsub.listen():
if message['type'] == 'message':
print(f"收到消息 [{message['channel']}]: {message['data']}")
# 模式订阅(订阅所有 news.* 频道)
def pattern_subscriber():
pubsub = r.pubsub()
pubsub.psubscribe('news.*')
for message in pubsub.listen():
if message['type'] == 'pmessage':
print(f"模式匹配 [{message['channel']}]: {message['data']}")
# 启动订阅者线程
t1 = threading.Thread(target=subscriber, args=(['news.tech', 'news.sports'],), daemon=True)
t2 = threading.Thread(target=pattern_subscriber, daemon=True)
t1.start(); t2.start()
time.sleep(0.1)
# 发布消息
r.publish('news.tech', 'Redis 7.2 发布了!')
r.publish('news.sports', '世界杯决赛即将开始')
Pub/Sub 的三大局限:① 无消息持久化,订阅者离线期间的消息永久丢失;② 无消费确认(ACK),消息发出即"忘记";③ 不支持消费者组,一条消息会被所有订阅者收到(广播)。如果需要任意一种可靠性保障,应使用 Stream。
Stream:Redis 的持久化消息流
Redis 5.0 引入的 Stream 是一种仿 Kafka 设计的持久化日志数据结构,支持:消息持久化、消费者组(同一消息只被一个消费者处理)、待确认消息(PEL)、消息回溯。
Stream 命令速查表
| 命令 | 说明 |
|---|---|
XADD key * field value ... | 追加消息,* 表示自动生成 ID(时间戳-序号) |
XLEN key | 消息数量 |
XRANGE key start end [COUNT n] | 按 ID 范围读取,- 表示最小,+ 表示最大 |
XREVRANGE key end start [COUNT n] | 反向读取 |
XREAD [COUNT n] [BLOCK ms] STREAMS key ... id ... | 读取新消息,$ 表示只读最新,可阻塞 |
XGROUP CREATE key group id [MKSTREAM] | 创建消费者组,$ 表示从当前最新开始消费 |
XREADGROUP GROUP group consumer [COUNT n] [BLOCK ms] STREAMS key > | 消费者组读取,> 表示未投递的新消息 |
XACK key group id [id ...] | 确认消息已处理(从 PEL 移除) |
XPENDING key group [start end count] | 查看待确认消息 |
XCLAIM key group consumer min-idle-time id | 转移长时间未确认的消息(消费者故障恢复) |
XDEL key id [id ...] | 删除消息 |
XTRIM key MAXLEN [~] count | 裁剪 Stream 长度(~ 允许近似裁剪) |
XINFO STREAM key | 查看 Stream 信息 |
消费者组完整示例(Python)
import redis, json, time, threading
r = redis.Redis(host='localhost', decode_responses=True)
STREAM_KEY = "stream:orders"
GROUP_NAME = "payment-service"
# 初始化:创建 Stream 和消费者组
def init_stream():
try:
# $ 表示只消费从现在开始的新消息
r.xgroup_create(STREAM_KEY, GROUP_NAME, id='$', mkstream=True)
print(f"消费者组 {GROUP_NAME} 创建成功")
except redis.exceptions.ResponseError as e:
if 'BUSYGROUP' in str(e):
print(f"消费者组 {GROUP_NAME} 已存在")
# 生产者:发布订单消息
def produce_order(order: dict) -> str:
msg_id = r.xadd(STREAM_KEY, order,
maxlen=10000) # 自动裁剪,保留最新 10000 条
print(f"[Producer] 消息ID: {msg_id}, 订单: {order}")
return msg_id
# 消费者:消费者组模式
def consume_orders(consumer_name: str):
print(f"[{consumer_name}] 开始消费")
while True:
try:
# XREADGROUP: 读取未投递的新消息,阻塞 2 秒
messages = r.xreadgroup(
GROUP_NAME, consumer_name,
{STREAM_KEY: '>'},
count=10,
block=2000
)
if not messages:
continue
for stream, msgs in messages:
for msg_id, data in msgs:
try:
process_order(data)
# 成功处理后 ACK(从待确认列表移除)
r.xack(STREAM_KEY, GROUP_NAME, msg_id)
print(f"[{consumer_name}] ACK: {msg_id}")
except Exception as e:
# 处理失败:消息留在 PEL,等待重试或转移
print(f"[{consumer_name}] 处理失败 {msg_id}: {e}")
except Exception as e:
print(f"[{consumer_name}] 错误: {e}")
time.sleep(1)
def process_order(data: dict):
time.sleep(0.05) # 模拟支付处理
# 故障恢复:转移超时未确认消息
def recover_pending(timeout_ms: int = 30000):
"""将超过 30 秒未确认的消息转移给其他消费者"""
pending = r.xpending_range(
STREAM_KEY, GROUP_NAME,
min='-', max='+', count=10,
idle=timeout_ms
)
for msg in pending:
r.xclaim(STREAM_KEY, GROUP_NAME,
'recovery-consumer', timeout_ms, msg['message_id'])
# 运行
init_stream()
for i in ['consumer-1', 'consumer-2']:
threading.Thread(target=consume_orders, args=(i,), daemon=True).start()
for i in range(10):
produce_order({'order_id': str(i), 'amount': str(i * 10)})
Stream vs Kafka:适用场景对比
| 特性 | Redis Stream | Apache Kafka |
|---|---|---|
| 消息持久化 | 内存+AOF/RDB(可能丢失) | 磁盘持久化(极低丢失风险) |
| 吞吐量 | 中(单节点 100K msg/s) | 极高(百万 msg/s,水平扩展) |
| 延迟 | 极低(亚毫秒) | 低(毫秒级) |
| 消费者组 | 支持(简单) | 支持(强大) |
| 消息回溯 | 支持(受内存限制) | 支持(可长期保留) |
| 运维复杂度 | 低(复用 Redis) | 高(ZooKeeper/KRaft+Kafka) |
| 适用场景 | 实时通知、小规模事件流、已有 Redis 的项目 | 大数据管道、日志聚合、事件溯源 |
选型建议:如果你的系统已经使用 Redis,且消息量在单节点承受范围内(<10万/秒),优先用 Stream 而非引入 Kafka,减少技术栈复杂度。当消息量超过 Redis 承受能力、需要长期消息保留(>7天)、或需要多数据中心复制时,才考虑 Kafka。
Pub/Sub vs Stream:深度对比
| 特性 | Pub/Sub | Stream |
|---|---|---|
| 消息持久化 | 否(实时广播) | 是(AOF/RDB 持久化) |
| 消费确认(ACK) | 否 | 是(XACK 命令) |
| 消费者组 | 否(所有订阅者都收到) | 是(组内每条消息只给一人) |
| 离线消息 | 丢失 | 消费者上线后可按 ID 回溯 |
| 消息堆积 | 不堆积(内存无压力) | 堆积占内存(需 XTRIM) |
| 适用场景 | 实时通知/在线聊天/配置广播 | 订单事件/日志收集/任务队列 |
Redis 7.x Sharded Pub/Sub
在 Redis Cluster 模式下,传统 Pub/Sub 的 PUBLISH 消息会广播到所有节点(每个节点都要处理),带来巨大的网络开销。Redis 7.0 引入的 Sharded Pub/Sub 将频道绑定到特定槽(slot),消息只在负责该槽的节点上传播:
# Sharded Pub/Sub(仅在 Cluster 模式下有意义)
# 发布:SPUBLISH 替代 PUBLISH
SPUBLISH mychannel "message"
# 订阅:SSUBSCRIBE 替代 SUBSCRIBE
SSUBSCRIBE mychannel
# Python 示例
from redis.cluster import RedisCluster
rc = RedisCluster(host='localhost', port=7001, decode_responses=True)
# 发布到分片频道(只路由到该频道所在槽的节点)
rc.spublish('notifications:user:1001', '{"type":"msg","content":"你好"}')
# 订阅(自动连接到负责该频道槽的节点)
sharded_pubsub = rc.pubsub()
sharded_pubsub.ssubscribe('notifications:user:1001')
Redis 提供两套消息传递机制,各有定位:
Pub/Sub:零延迟广播,无持久化,订阅者在线才能收到消息。适合实时性要求高、允许消息丢失的场景(在线通知、配置更新广播)。Cluster 环境使用 SSUBSCRIBE 替代以避免全集群广播。
Stream:持久化消息队列,支持消费者组、ACK、回溯、故障恢复(XCLAIM)。适合需要可靠投递的业务事件(订单、支付、日志)。
选型核心问题:"消息能丢吗?" — 能丢用 Pub/Sub,不能丢用 Stream;消息量极大且需要长期保留,考虑 Kafka。