Chapter 06

发布订阅与 Stream

PubSub 的实时广播、Stream 的持久化消费者组——Redis 的两种消息传递模式

发布订阅(Pub/Sub)

Pub/Sub 是 Redis 内置的消息广播机制。发布者(Publisher)向频道(Channel)发送消息,所有订阅该频道的订阅者(Subscriber)同时收到消息,实现解耦通信。

Pub/Sub 模型 Publisher A ──PUBLISH news "新闻1"──▶ Channel: news ──▶ Subscriber 1 Publisher B ──PUBLISH news "新闻2"──▶ ──▶ Subscriber 2 ──▶ Subscriber 3 特点:实时广播,无持久化,订阅者离线期间的消息会丢失

Pub/Sub 命令速查表

命令语法说明
SUBSCRIBESUBSCRIBE channel [channel ...]订阅指定频道
UNSUBSCRIBEUNSUBSCRIBE [channel ...]取消订阅
PUBLISHPUBLISH channel message向频道发布消息,返回接收者数量
PSUBSCRIBEPSUBSCRIBE pattern [pattern ...]按 glob 模式订阅(如 news.*)
PUNSUBSCRIBEPUNSUBSCRIBE [pattern ...]取消模式订阅
PUBSUB CHANNELSPUBSUB CHANNELS [pattern]列出活跃频道
PUBSUB NUMSUBPUBSUB NUMSUB [channel ...]各频道的订阅者数量
SSUBSCRIBESSUBSCRIBE shardchannel分片订阅(Redis 7.x Cluster 优化)

Pub/Sub Python 示例

import redis, threading, time

r = redis.Redis(host='localhost', decode_responses=True)

# 订阅者(在独立线程中运行)
def subscriber(channels: list):
    pubsub = r.pubsub()
    pubsub.subscribe(*channels)
    print(f"订阅频道: {channels}")
    for message in pubsub.listen():
        if message['type'] == 'message':
            print(f"收到消息 [{message['channel']}]: {message['data']}")

# 模式订阅(订阅所有 news.* 频道)
def pattern_subscriber():
    pubsub = r.pubsub()
    pubsub.psubscribe('news.*')
    for message in pubsub.listen():
        if message['type'] == 'pmessage':
            print(f"模式匹配 [{message['channel']}]: {message['data']}")

# 启动订阅者线程
t1 = threading.Thread(target=subscriber, args=(['news.tech', 'news.sports'],), daemon=True)
t2 = threading.Thread(target=pattern_subscriber, daemon=True)
t1.start(); t2.start()
time.sleep(0.1)

# 发布消息
r.publish('news.tech', 'Redis 7.2 发布了!')
r.publish('news.sports', '世界杯决赛即将开始')

Pub/Sub 的三大局限:① 无消息持久化,订阅者离线期间的消息永久丢失;② 无消费确认(ACK),消息发出即"忘记";③ 不支持消费者组,一条消息会被所有订阅者收到(广播)。如果需要任意一种可靠性保障,应使用 Stream。

Stream:Redis 的持久化消息流

Redis 5.0 引入的 Stream 是一种仿 Kafka 设计的持久化日志数据结构,支持:消息持久化、消费者组(同一消息只被一个消费者处理)、待确认消息(PEL)、消息回溯。

Stream 结构 Stream: orders ┌──────────────────────────────────────────────────────┐ │ ID: 1700000001-0 │ field: order_id value: 1001 │ │ │ field: amount value: 299.00 │ ├──────────────────────────────────────────────────────┤ │ ID: 1700000002-0 │ field: order_id value: 1002 │ │ │ field: amount value: 59.90 │ ├──────────────────────────────────────────────────────┤ │ ID: 1700000003-0 │ ... │ └──────────────────────────────────────────────────────┘ ↓ ↓ Consumer Group: payment Consumer Group: notification consumer1, consumer2 consumer3 (同组内一条消息只给一个消费者)(不同组各收一份)

Stream 命令速查表

命令说明
XADD key * field value ...追加消息,* 表示自动生成 ID(时间戳-序号)
XLEN key消息数量
XRANGE key start end [COUNT n]按 ID 范围读取,- 表示最小,+ 表示最大
XREVRANGE key end start [COUNT n]反向读取
XREAD [COUNT n] [BLOCK ms] STREAMS key ... id ...读取新消息,$ 表示只读最新,可阻塞
XGROUP CREATE key group id [MKSTREAM]创建消费者组,$ 表示从当前最新开始消费
XREADGROUP GROUP group consumer [COUNT n] [BLOCK ms] STREAMS key >消费者组读取,> 表示未投递的新消息
XACK key group id [id ...]确认消息已处理(从 PEL 移除)
XPENDING key group [start end count]查看待确认消息
XCLAIM key group consumer min-idle-time id转移长时间未确认的消息(消费者故障恢复)
XDEL key id [id ...]删除消息
XTRIM key MAXLEN [~] count裁剪 Stream 长度(~ 允许近似裁剪)
XINFO STREAM key查看 Stream 信息

消费者组完整示例(Python)

import redis, json, time, threading

r = redis.Redis(host='localhost', decode_responses=True)
STREAM_KEY = "stream:orders"
GROUP_NAME = "payment-service"

# 初始化:创建 Stream 和消费者组
def init_stream():
    try:
        # $ 表示只消费从现在开始的新消息
        r.xgroup_create(STREAM_KEY, GROUP_NAME, id='$', mkstream=True)
        print(f"消费者组 {GROUP_NAME} 创建成功")
    except redis.exceptions.ResponseError as e:
        if 'BUSYGROUP' in str(e):
            print(f"消费者组 {GROUP_NAME} 已存在")

# 生产者:发布订单消息
def produce_order(order: dict) -> str:
    msg_id = r.xadd(STREAM_KEY, order,
                       maxlen=10000)  # 自动裁剪,保留最新 10000 条
    print(f"[Producer] 消息ID: {msg_id}, 订单: {order}")
    return msg_id

# 消费者:消费者组模式
def consume_orders(consumer_name: str):
    print(f"[{consumer_name}] 开始消费")
    while True:
        try:
            # XREADGROUP: 读取未投递的新消息,阻塞 2 秒
            messages = r.xreadgroup(
                GROUP_NAME, consumer_name,
                {STREAM_KEY: '>'},
                count=10,
                block=2000
            )
            if not messages:
                continue

            for stream, msgs in messages:
                for msg_id, data in msgs:
                    try:
                        process_order(data)
                        # 成功处理后 ACK(从待确认列表移除)
                        r.xack(STREAM_KEY, GROUP_NAME, msg_id)
                        print(f"[{consumer_name}] ACK: {msg_id}")
                    except Exception as e:
                        # 处理失败:消息留在 PEL,等待重试或转移
                        print(f"[{consumer_name}] 处理失败 {msg_id}: {e}")
        except Exception as e:
            print(f"[{consumer_name}] 错误: {e}")
            time.sleep(1)

def process_order(data: dict):
    time.sleep(0.05)  # 模拟支付处理

# 故障恢复:转移超时未确认消息
def recover_pending(timeout_ms: int = 30000):
    """将超过 30 秒未确认的消息转移给其他消费者"""
    pending = r.xpending_range(
        STREAM_KEY, GROUP_NAME,
        min='-', max='+', count=10,
        idle=timeout_ms
    )
    for msg in pending:
        r.xclaim(STREAM_KEY, GROUP_NAME,
                  'recovery-consumer', timeout_ms, msg['message_id'])

# 运行
init_stream()
for i in ['consumer-1', 'consumer-2']:
    threading.Thread(target=consume_orders, args=(i,), daemon=True).start()
for i in range(10):
    produce_order({'order_id': str(i), 'amount': str(i * 10)})

Stream vs Kafka:适用场景对比

特性Redis StreamApache Kafka
消息持久化内存+AOF/RDB(可能丢失)磁盘持久化(极低丢失风险)
吞吐量中(单节点 100K msg/s)极高(百万 msg/s,水平扩展)
延迟极低(亚毫秒)低(毫秒级)
消费者组支持(简单)支持(强大)
消息回溯支持(受内存限制)支持(可长期保留)
运维复杂度低(复用 Redis)高(ZooKeeper/KRaft+Kafka)
适用场景实时通知、小规模事件流、已有 Redis 的项目大数据管道、日志聚合、事件溯源

选型建议:如果你的系统已经使用 Redis,且消息量在单节点承受范围内(<10万/秒),优先用 Stream 而非引入 Kafka,减少技术栈复杂度。当消息量超过 Redis 承受能力、需要长期消息保留(>7天)、或需要多数据中心复制时,才考虑 Kafka。