发布订阅(Pub/Sub)
Pub/Sub 是 Redis 内置的消息广播机制。发布者(Publisher)向频道(Channel)发送消息,所有订阅该频道的订阅者(Subscriber)同时收到消息,实现解耦通信。
Pub/Sub 模型
Publisher A ──PUBLISH news "新闻1"──▶ Channel: news ──▶ Subscriber 1
Publisher B ──PUBLISH news "新闻2"──▶ ──▶ Subscriber 2
──▶ Subscriber 3
特点:实时广播,无持久化,订阅者离线期间的消息会丢失
Pub/Sub 命令速查表
| 命令 | 语法 | 说明 |
|---|---|---|
SUBSCRIBE | SUBSCRIBE channel [channel ...] | 订阅指定频道 |
UNSUBSCRIBE | UNSUBSCRIBE [channel ...] | 取消订阅 |
PUBLISH | PUBLISH channel message | 向频道发布消息,返回接收者数量 |
PSUBSCRIBE | PSUBSCRIBE pattern [pattern ...] | 按 glob 模式订阅(如 news.*) |
PUNSUBSCRIBE | PUNSUBSCRIBE [pattern ...] | 取消模式订阅 |
PUBSUB CHANNELS | PUBSUB CHANNELS [pattern] | 列出活跃频道 |
PUBSUB NUMSUB | PUBSUB NUMSUB [channel ...] | 各频道的订阅者数量 |
SSUBSCRIBE | SSUBSCRIBE shardchannel | 分片订阅(Redis 7.x Cluster 优化) |
Pub/Sub Python 示例
import redis, threading, time
r = redis.Redis(host='localhost', decode_responses=True)
# 订阅者(在独立线程中运行)
def subscriber(channels: list):
pubsub = r.pubsub()
pubsub.subscribe(*channels)
print(f"订阅频道: {channels}")
for message in pubsub.listen():
if message['type'] == 'message':
print(f"收到消息 [{message['channel']}]: {message['data']}")
# 模式订阅(订阅所有 news.* 频道)
def pattern_subscriber():
pubsub = r.pubsub()
pubsub.psubscribe('news.*')
for message in pubsub.listen():
if message['type'] == 'pmessage':
print(f"模式匹配 [{message['channel']}]: {message['data']}")
# 启动订阅者线程
t1 = threading.Thread(target=subscriber, args=(['news.tech', 'news.sports'],), daemon=True)
t2 = threading.Thread(target=pattern_subscriber, daemon=True)
t1.start(); t2.start()
time.sleep(0.1)
# 发布消息
r.publish('news.tech', 'Redis 7.2 发布了!')
r.publish('news.sports', '世界杯决赛即将开始')
Pub/Sub 的三大局限:① 无消息持久化,订阅者离线期间的消息永久丢失;② 无消费确认(ACK),消息发出即"忘记";③ 不支持消费者组,一条消息会被所有订阅者收到(广播)。如果需要任意一种可靠性保障,应使用 Stream。
Stream:Redis 的持久化消息流
Redis 5.0 引入的 Stream 是一种仿 Kafka 设计的持久化日志数据结构,支持:消息持久化、消费者组(同一消息只被一个消费者处理)、待确认消息(PEL)、消息回溯。
Stream 结构
Stream: orders
↓
┌──────────────────────────────────────────────────────┐
│ ID: 1700000001-0 │ field: order_id value: 1001 │
│ │ field: amount value: 299.00 │
├──────────────────────────────────────────────────────┤
│ ID: 1700000002-0 │ field: order_id value: 1002 │
│ │ field: amount value: 59.90 │
├──────────────────────────────────────────────────────┤
│ ID: 1700000003-0 │ ... │
└──────────────────────────────────────────────────────┘
↓ ↓
Consumer Group: payment Consumer Group: notification
consumer1, consumer2 consumer3
(同组内一条消息只给一个消费者)(不同组各收一份)
Stream 命令速查表
| 命令 | 说明 |
|---|---|
XADD key * field value ... | 追加消息,* 表示自动生成 ID(时间戳-序号) |
XLEN key | 消息数量 |
XRANGE key start end [COUNT n] | 按 ID 范围读取,- 表示最小,+ 表示最大 |
XREVRANGE key end start [COUNT n] | 反向读取 |
XREAD [COUNT n] [BLOCK ms] STREAMS key ... id ... | 读取新消息,$ 表示只读最新,可阻塞 |
XGROUP CREATE key group id [MKSTREAM] | 创建消费者组,$ 表示从当前最新开始消费 |
XREADGROUP GROUP group consumer [COUNT n] [BLOCK ms] STREAMS key > | 消费者组读取,> 表示未投递的新消息 |
XACK key group id [id ...] | 确认消息已处理(从 PEL 移除) |
XPENDING key group [start end count] | 查看待确认消息 |
XCLAIM key group consumer min-idle-time id | 转移长时间未确认的消息(消费者故障恢复) |
XDEL key id [id ...] | 删除消息 |
XTRIM key MAXLEN [~] count | 裁剪 Stream 长度(~ 允许近似裁剪) |
XINFO STREAM key | 查看 Stream 信息 |
消费者组完整示例(Python)
import redis, json, time, threading
r = redis.Redis(host='localhost', decode_responses=True)
STREAM_KEY = "stream:orders"
GROUP_NAME = "payment-service"
# 初始化:创建 Stream 和消费者组
def init_stream():
try:
# $ 表示只消费从现在开始的新消息
r.xgroup_create(STREAM_KEY, GROUP_NAME, id='$', mkstream=True)
print(f"消费者组 {GROUP_NAME} 创建成功")
except redis.exceptions.ResponseError as e:
if 'BUSYGROUP' in str(e):
print(f"消费者组 {GROUP_NAME} 已存在")
# 生产者:发布订单消息
def produce_order(order: dict) -> str:
msg_id = r.xadd(STREAM_KEY, order,
maxlen=10000) # 自动裁剪,保留最新 10000 条
print(f"[Producer] 消息ID: {msg_id}, 订单: {order}")
return msg_id
# 消费者:消费者组模式
def consume_orders(consumer_name: str):
print(f"[{consumer_name}] 开始消费")
while True:
try:
# XREADGROUP: 读取未投递的新消息,阻塞 2 秒
messages = r.xreadgroup(
GROUP_NAME, consumer_name,
{STREAM_KEY: '>'},
count=10,
block=2000
)
if not messages:
continue
for stream, msgs in messages:
for msg_id, data in msgs:
try:
process_order(data)
# 成功处理后 ACK(从待确认列表移除)
r.xack(STREAM_KEY, GROUP_NAME, msg_id)
print(f"[{consumer_name}] ACK: {msg_id}")
except Exception as e:
# 处理失败:消息留在 PEL,等待重试或转移
print(f"[{consumer_name}] 处理失败 {msg_id}: {e}")
except Exception as e:
print(f"[{consumer_name}] 错误: {e}")
time.sleep(1)
def process_order(data: dict):
time.sleep(0.05) # 模拟支付处理
# 故障恢复:转移超时未确认消息
def recover_pending(timeout_ms: int = 30000):
"""将超过 30 秒未确认的消息转移给其他消费者"""
pending = r.xpending_range(
STREAM_KEY, GROUP_NAME,
min='-', max='+', count=10,
idle=timeout_ms
)
for msg in pending:
r.xclaim(STREAM_KEY, GROUP_NAME,
'recovery-consumer', timeout_ms, msg['message_id'])
# 运行
init_stream()
for i in ['consumer-1', 'consumer-2']:
threading.Thread(target=consume_orders, args=(i,), daemon=True).start()
for i in range(10):
produce_order({'order_id': str(i), 'amount': str(i * 10)})
Stream vs Kafka:适用场景对比
| 特性 | Redis Stream | Apache Kafka |
|---|---|---|
| 消息持久化 | 内存+AOF/RDB(可能丢失) | 磁盘持久化(极低丢失风险) |
| 吞吐量 | 中(单节点 100K msg/s) | 极高(百万 msg/s,水平扩展) |
| 延迟 | 极低(亚毫秒) | 低(毫秒级) |
| 消费者组 | 支持(简单) | 支持(强大) |
| 消息回溯 | 支持(受内存限制) | 支持(可长期保留) |
| 运维复杂度 | 低(复用 Redis) | 高(ZooKeeper/KRaft+Kafka) |
| 适用场景 | 实时通知、小规模事件流、已有 Redis 的项目 | 大数据管道、日志聚合、事件溯源 |
选型建议:如果你的系统已经使用 Redis,且消息量在单节点承受范围内(<10万/秒),优先用 Stream 而非引入 Kafka,减少技术栈复杂度。当消息量超过 Redis 承受能力、需要长期消息保留(>7天)、或需要多数据中心复制时,才考虑 Kafka。