Chapter 07

消息队列 RabbitMQ

AMQP 协议精要,Exchange/Queue/Binding 模型,消息确认机制,死信队列与延迟消息。

为什么需要消息队列

消息队列(Message Queue)是分布式系统中解决异步通信问题的核心中间件,主要解决三类场景:

异步解耦
用户下单后,订单系统只需发送一条消息,积分系统、短信系统、仓储系统各自异步处理,互不阻塞。即使某个下游服务宕机,消息会在队列中等待,不影响下单主流程。
流量削峰
秒杀活动瞬间涌入 10 万请求,后端每秒只能处理 1000 个订单。消息队列充当缓冲区,将请求排队,后端以自己的速度消费,避免数据库被打垮。
可靠通信
消息持久化到磁盘,消费者挂掉重启后消息不丢失。配合 ACK 机制,确保每条消息至少被成功处理一次(At Least Once 语义)。

核心概念名词解释

AMQP(Advanced Message Queuing Protocol)
高级消息队列协议,是 RabbitMQ 实现的核心协议。定义了消息格式、路由规则、确认机制等标准。RabbitMQ 是 AMQP 最广泛使用的实现之一。
Exchange(交换器)
消息的路由中枢。生产者将消息发送到 Exchange,Exchange 根据绑定规则和路由键决定将消息路由到哪个(些)Queue。Exchange 本身不存储消息。
Queue(队列)
消息的存储区。消息在 Queue 中持久化,等待消费者拉取。多个消费者可以监听同一个 Queue,RabbitMQ 默认使用轮询分发(Round-Robin)。
Binding(绑定)与 RoutingKey(路由键)
Binding 是 Exchange 和 Queue 之间的关联关系,创建时指定 RoutingKey(绑定键)。发送消息时也要指定 RoutingKey,Exchange 通过匹配规则决定路由目标。
ACK(消息确认)
消费者处理完消息后向 RabbitMQ 发送确认(ack),告知消息已成功处理可以删除。若消费者未发送 ack 就断开连接,RabbitMQ 会将消息重新入队分发给其他消费者。
死信队列(DLQ — Dead Letter Queue)
消息因被拒绝(nack)、超过最大重试次数或 TTL 过期而无法被正常消费时,会被路由到死信队列。死信队列中的消息通常需要人工干预或特殊处理逻辑。

Exchange 类型

类型路由规则使用场景
DirectRoutingKey 精确匹配绑定键点对点消息,如订单状态通知
TopicRoutingKey 通配符匹配(* 匹配一个词,# 匹配多个词)按类别订阅,如 order.created、order.*.paid
Fanout广播,忽略 RoutingKey,所有绑定 Queue 都收到系统通知、广播事件
Headers根据消息头部属性匹配(不使用 RoutingKey)复杂路由逻辑,较少使用

消息从发送到消费的完整链路

┌──────────────────────────────────────────────────────────────────┐ │ RabbitMQ 消息流转完整链路 │ └──────────────────────────────────────────────────────────────────┘ 【生产者端】 OrderService │ │ rabbitTemplate.convertAndSend(exchange, routingKey, message) ▼ ┌─────────────────────────────────────────┐ │ RabbitMQ Broker │ │ │ │ Exchange: order.events (topic) │ │ │ │ │ ├── routingKey: order.created ──→ Queue: order.created.queue │ ├── routingKey: order.paid ──→ Queue: payment.queue │ └── routingKey: order.* ──→ Queue: audit.queue(通配符) │ │ │ 死信配置:消费失败 → DLX Exchange │ │ → dead.letter.queue │ └─────────────────────────────────────────┘ │ │ ▼ ▼ InventoryConsumer NotificationConsumer @RabbitListener @RabbitListener │ │ ├── 处理成功 → ack() ├── 处理成功 → ack() └── 处理失败 → nack() └── 异常 → 进入死信队列 │ └── 重试 3 次后 → Dead Letter Queue

Spring Boot 集成 RabbitMQ

依赖配置

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

Queue、Exchange、Binding 配置

@Configuration
public class RabbitConfig {

    public static final String ORDER_QUEUE = "order.created.queue";
    public static final String ORDER_EXCHANGE = "order.events";
    public static final String ORDER_ROUTING_KEY = "order.created";
    public static final String DLQ_NAME = "order.dead.letter.queue";
    public static final String DLX_NAME = "order.dead.letter.exchange";

    // 死信交换器
    @Bean
    public DirectExchange deadLetterExchange() {
        return new DirectExchange(DLX_NAME);
    }

    // 死信队列
    @Bean
    public Queue deadLetterQueue() {
        return QueueBuilder.durable(DLQ_NAME).build();
    }

    @Bean
    public Binding dlqBinding() {
        return BindingBuilder.bind(deadLetterQueue())
            .to(deadLetterExchange()).with("order.created");
    }

    // 业务队列(绑定死信配置)
    @Bean
    public Queue orderQueue() {
        return QueueBuilder.durable(ORDER_QUEUE)
            .deadLetterExchange(DLX_NAME)      // 死信路由到 DLX
            .deadLetterRoutingKey("order.created")
            .ttl(60000)                          // 消息 TTL 60秒
            .build();
    }

    @Bean
    public TopicExchange orderExchange() {
        return new TopicExchange(ORDER_EXCHANGE);
    }

    @Bean
    public Binding orderBinding() {
        return BindingBuilder.bind(orderQueue())
            .to(orderExchange()).with(ORDER_ROUTING_KEY);
    }
}

生产者:发送消息

@Service
@RequiredArgsConstructor
public class OrderService {

    private final RabbitTemplate rabbitTemplate;
    private final OrderRepository orderRepository;

    @Transactional
    public Order createOrder(CreateOrderRequest req) {
        Order order = /* 创建订单并保存 */ orderRepository.save(buildOrder(req));

        // 发送消息到 RabbitMQ(事务提交后发送更安全)
        OrderCreatedEvent event = new OrderCreatedEvent(
            order.getId(), order.getUserId(), order.getTotalAmount()
        );
        rabbitTemplate.convertAndSend(
            RabbitConfig.ORDER_EXCHANGE,
            RabbitConfig.ORDER_ROUTING_KEY,
            event
        );
        return order;
    }
}

消费者:处理消息(手动 ACK)

@Component
public class OrderCreatedConsumer {

    @RabbitListener(queues = RabbitConfig.ORDER_QUEUE, ackMode = "MANUAL")
    public void handleOrderCreated(
            @Payload OrderCreatedEvent event,
            Channel channel,
            @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
        try {
            // 处理业务(幂等设计:用 orderId 去重)
            processOrderCreated(event);

            // 手动 ACK:通知 RabbitMQ 消息已成功处理
            channel.basicAck(tag, false);

        } catch (BusinessException e) {
            // 业务异常:不重试,直接 nack 进入死信队列
            log.error("业务处理失败,进入死信队列: {}", event, e);
            channel.basicNack(tag, false, false);  // requeue=false → 死信

        } catch (Exception e) {
            // 临时异常(网络、超时):重新入队重试
            log.warn("临时异常,重新入队: {}", event, e);
            channel.basicNack(tag, false, true);   // requeue=true → 重试
        }
    }
}
Warning 消费者必须做幂等处理。RabbitMQ 的 At Least Once 语义意味着网络重传或消费者重启可能导致消息被重复投递。通常使用 orderId + 唯一约束、Redis Set 或幂等表来去重,确保相同消息处理多次结果一致。
Tip 生产环境建议开启消息持久化(durable=true + deliveryMode=2)和 Publisher Confirm(发布者确认),确保消息不因 Broker 故障而丢失。同时对死信队列设置告警,及时发现处理失败的消息。