为什么需要消息队列
消息队列(Message Queue)是分布式系统中解决异步通信问题的核心中间件,主要解决三类场景:
异步解耦
用户下单后,订单系统只需发送一条消息,积分系统、短信系统、仓储系统各自异步处理,互不阻塞。即使某个下游服务宕机,消息会在队列中等待,不影响下单主流程。
流量削峰
秒杀活动瞬间涌入 10 万请求,后端每秒只能处理 1000 个订单。消息队列充当缓冲区,将请求排队,后端以自己的速度消费,避免数据库被打垮。
可靠通信
消息持久化到磁盘,消费者挂掉重启后消息不丢失。配合 ACK 机制,确保每条消息至少被成功处理一次(At Least Once 语义)。
核心概念名词解释
AMQP(Advanced Message Queuing Protocol)
高级消息队列协议,是 RabbitMQ 实现的核心协议。定义了消息格式、路由规则、确认机制等标准。RabbitMQ 是 AMQP 最广泛使用的实现之一。
Exchange(交换器)
消息的路由中枢。生产者将消息发送到 Exchange,Exchange 根据绑定规则和路由键决定将消息路由到哪个(些)Queue。Exchange 本身不存储消息。
Queue(队列)
消息的存储区。消息在 Queue 中持久化,等待消费者拉取。多个消费者可以监听同一个 Queue,RabbitMQ 默认使用轮询分发(Round-Robin)。
Binding(绑定)与 RoutingKey(路由键)
Binding 是 Exchange 和 Queue 之间的关联关系,创建时指定 RoutingKey(绑定键)。发送消息时也要指定 RoutingKey,Exchange 通过匹配规则决定路由目标。
ACK(消息确认)
消费者处理完消息后向 RabbitMQ 发送确认(ack),告知消息已成功处理可以删除。若消费者未发送 ack 就断开连接,RabbitMQ 会将消息重新入队分发给其他消费者。
死信队列(DLQ — Dead Letter Queue)
消息因被拒绝(nack)、超过最大重试次数或 TTL 过期而无法被正常消费时,会被路由到死信队列。死信队列中的消息通常需要人工干预或特殊处理逻辑。
Exchange 类型
| 类型 | 路由规则 | 使用场景 |
|---|---|---|
| Direct | RoutingKey 精确匹配绑定键 | 点对点消息,如订单状态通知 |
| Topic | RoutingKey 通配符匹配(* 匹配一个词,# 匹配多个词) | 按类别订阅,如 order.created、order.*.paid |
| Fanout | 广播,忽略 RoutingKey,所有绑定 Queue 都收到 | 系统通知、广播事件 |
| Headers | 根据消息头部属性匹配(不使用 RoutingKey) | 复杂路由逻辑,较少使用 |
消息从发送到消费的完整链路
┌──────────────────────────────────────────────────────────────────┐
│ RabbitMQ 消息流转完整链路 │
└──────────────────────────────────────────────────────────────────┘
【生产者端】
OrderService
│
│ rabbitTemplate.convertAndSend(exchange, routingKey, message)
▼
┌─────────────────────────────────────────┐
│ RabbitMQ Broker │
│ │
│ Exchange: order.events (topic) │
│ │ │
│ ├── routingKey: order.created ──→ Queue: order.created.queue
│ ├── routingKey: order.paid ──→ Queue: payment.queue
│ └── routingKey: order.* ──→ Queue: audit.queue(通配符)
│ │
│ 死信配置:消费失败 → DLX Exchange │
│ → dead.letter.queue │
└─────────────────────────────────────────┘
│ │
▼ ▼
InventoryConsumer NotificationConsumer
@RabbitListener @RabbitListener
│ │
├── 处理成功 → ack() ├── 处理成功 → ack()
└── 处理失败 → nack() └── 异常 → 进入死信队列
│
└── 重试 3 次后 → Dead Letter Queue
Spring Boot 集成 RabbitMQ
依赖配置
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
Queue、Exchange、Binding 配置
@Configuration
public class RabbitConfig {
public static final String ORDER_QUEUE = "order.created.queue";
public static final String ORDER_EXCHANGE = "order.events";
public static final String ORDER_ROUTING_KEY = "order.created";
public static final String DLQ_NAME = "order.dead.letter.queue";
public static final String DLX_NAME = "order.dead.letter.exchange";
// 死信交换器
@Bean
public DirectExchange deadLetterExchange() {
return new DirectExchange(DLX_NAME);
}
// 死信队列
@Bean
public Queue deadLetterQueue() {
return QueueBuilder.durable(DLQ_NAME).build();
}
@Bean
public Binding dlqBinding() {
return BindingBuilder.bind(deadLetterQueue())
.to(deadLetterExchange()).with("order.created");
}
// 业务队列(绑定死信配置)
@Bean
public Queue orderQueue() {
return QueueBuilder.durable(ORDER_QUEUE)
.deadLetterExchange(DLX_NAME) // 死信路由到 DLX
.deadLetterRoutingKey("order.created")
.ttl(60000) // 消息 TTL 60秒
.build();
}
@Bean
public TopicExchange orderExchange() {
return new TopicExchange(ORDER_EXCHANGE);
}
@Bean
public Binding orderBinding() {
return BindingBuilder.bind(orderQueue())
.to(orderExchange()).with(ORDER_ROUTING_KEY);
}
}
生产者:发送消息
@Service
@RequiredArgsConstructor
public class OrderService {
private final RabbitTemplate rabbitTemplate;
private final OrderRepository orderRepository;
@Transactional
public Order createOrder(CreateOrderRequest req) {
Order order = /* 创建订单并保存 */ orderRepository.save(buildOrder(req));
// 发送消息到 RabbitMQ(事务提交后发送更安全)
OrderCreatedEvent event = new OrderCreatedEvent(
order.getId(), order.getUserId(), order.getTotalAmount()
);
rabbitTemplate.convertAndSend(
RabbitConfig.ORDER_EXCHANGE,
RabbitConfig.ORDER_ROUTING_KEY,
event
);
return order;
}
}
消费者:处理消息(手动 ACK)
@Component
public class OrderCreatedConsumer {
@RabbitListener(queues = RabbitConfig.ORDER_QUEUE, ackMode = "MANUAL")
public void handleOrderCreated(
@Payload OrderCreatedEvent event,
Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
try {
// 处理业务(幂等设计:用 orderId 去重)
processOrderCreated(event);
// 手动 ACK:通知 RabbitMQ 消息已成功处理
channel.basicAck(tag, false);
} catch (BusinessException e) {
// 业务异常:不重试,直接 nack 进入死信队列
log.error("业务处理失败,进入死信队列: {}", event, e);
channel.basicNack(tag, false, false); // requeue=false → 死信
} catch (Exception e) {
// 临时异常(网络、超时):重新入队重试
log.warn("临时异常,重新入队: {}", event, e);
channel.basicNack(tag, false, true); // requeue=true → 重试
}
}
}
Warning
消费者必须做幂等处理。RabbitMQ 的 At Least Once 语义意味着网络重传或消费者重启可能导致消息被重复投递。通常使用 orderId + 唯一约束、Redis Set 或幂等表来去重,确保相同消息处理多次结果一致。
Tip
生产环境建议开启消息持久化(durable=true + deliveryMode=2)和 Publisher Confirm(发布者确认),确保消息不因 Broker 故障而丢失。同时对死信队列设置告警,及时发现处理失败的消息。