【消息队列】【从入门到可以】
# 消息队列深度实战:从入门到精通
## 目录
1. [MQ入门:为什么需要消息队列](#1-mq入门为什么需要消息队列)
2. [MQ基础概念与核心原理](#2-mq基础概念与核心原理)
3. [快速入门:RabbitMQ实战](#3-快速入门rabbitmq实战)
4. [项目实战一:电商订单系统](#4-项目实战一电商订单系统)
5. [项目实战二:日志收集系统](#5-项目实战二日志收集系统)
6. [项目实战三:分布式事务处理](#6-项目实战三分布式事务处理)
7. [高阶应用:消息可靠性保证](#7-高阶应用消息可靠性保证)
8. [性能优化与监控](#8-性能优化与监控)
9. [多MQ对比与选型](#9-多mq对比与选型)
10. [总结与最佳实践](#10-总结与最佳实践)
---
## 1. MQ入门:为什么需要消息队列
### 1.1 一个真实的业务场景
想象一个电商系统的下单流程:
```
用户下单 → 创建订单 → 扣减库存 → 发送通知 → 更新积分 → 记录日志
```
#### 场景1:同步调用的问题
```java
/**
* 传统同步调用方式
* 问题:每个环节都依赖前一个环节,一个环节失败,整个流程失败
*/
public class OrderServiceSync {
public void placeOrder(Order order) {
// 1. 创建订单
orderDao.save(order);
// 2. 扣减库存(可能因为网络原因超时)
inventoryService.deduct(order.getProductId(), order.getQuantity());
// 3. 发送通知(短信服务可能宕机)
notificationService.send(order.getUserId(), "订单创建成功");
// 4. 更新积分(积分系统可能维护中)
pointsService.add(order.getUserId(), order.getAmount());
// 5. 记录日志
logService.record(order);
}
}
```
**问题分析:**
- 耦合度高:订单系统强依赖库存、通知、积分等系统
- 性能差:总响应时间 = 所有服务响应时间之和
- 可用性低:任何一环失败,整个流程失败
- 扩展性差:难以独立扩展各个服务
#### 场景2:使用MQ后的改进
```java
/**
* 使用MQ异步解耦
*/
public class OrderServiceAsync {
@Autowired
private RabbitTemplate rabbitTemplate;
public void placeOrder(Order order) {
// 1. 创建订单(核心业务,必须同步)
orderDao.save(order);
// 2. 发送消息到MQ(异步处理)
rabbitTemplate.convertAndSend("order.exchange", "order.created", order);
// 立即返回,后续处理异步完成
}
}
/**
* 库存服务监听消息
*/
@RabbitListener(queues = "order.inventory.queue")
public class InventoryConsumer {
public void handleOrderCreated(Order order) {
// 异步扣减库存
inventoryService.deduct(order.getProductId(), order.getQuantity());
}
}
/**
* 通知服务监听消息
*/
@RabbitListener(queues = "order.notification.queue")
public class NotificationConsumer {
public void handleOrderCreated(Order order) {
// 异步发送通知
notificationService.send(order.getUserId(), "订单创建成功");
}
}
```
**改进效果:**
- 解耦:订单系统不依赖其他系统
- 性能提升:主流程响应时间大幅减少
- 可用性提高:某个服务故障不影响订单创建
- 易于扩展:可以独立扩展消费者
### 1.2 MQ的核心价值
```java
/**
* MQ核心价值演示
*/
public class MQValueDemo {
/**
* 1. 异步处理
* 场景:用户注册后发送欢迎邮件、初始化数据、赠送积分
*/
public void asyncProcessing() {
// 传统方式:用户需要等待所有操作完成
// 注册(100ms) + 发送邮件(200ms) + 初始化数据(150ms) + 赠送积分(100ms) = 550ms
// MQ方式:用户只需等待注册完成
// 注册(100ms) = 100ms
// 其他操作异步执行,不影响用户体验
}
/**
* 2. 应用解耦
* 场景:订单系统需要通知多个下游系统
*/
public void decoupling() {
// 传统方式:订单系统需要知道所有下游系统的接口
// orderService.notify(inventoryService, paymentService, logisticsService, ...)
// MQ方式:订单系统只需发送消息
// 不需要知道有哪些消费者,也不关心消费者如何处理
}
/**
* 3. 流量削峰
* 场景:秒杀系统,瞬时流量巨大
*/
public void peakShaving() {
// 传统方式:瞬时100万请求直接打到数据库,数据库崩溃
// 数据库QPS: 5000
// MQ方式:请求先进入MQ,消费者按自己的能力处理
// MQ可以承受100万请求/秒
// 数据库QPS: 5000(由消费者控制)
}
}
```
---
## 2. MQ基础概念与核心原理
### 2.1 核心概念
```java
/**
* MQ核心概念
*/
public class MQCoreConcepts {
/**
* 生产者(Producer)
* - 消息的发送方
* - 负责创建和发送消息
*/
public static class Producer {
public void send(String message) {
// 发送消息到MQ
}
}
/**
* 消费者(Consumer)
* - 消息的接收方
* - 负责处理消息
*/
public static class Consumer {
@RabbitListener(queues = "test.queue")
public void receive(String message) {
// 处理消息
}
}
/**
* 队列(Queue)
* - 消息的容器
* - 先进先出(FIFO)
*/
public static class Queue {
private List messages = new LinkedList<>();
public void enqueue(Message message) {
messages.add(message);
}
public Message dequeue() {
return messages.isEmpty() ? null : messages.remove(0);
}
}
/**
* 交换机(Exchange)
* - 消息路由器
* - 根据路由规则将消息发送到队列
*/
public static class Exchange {
public enum ExchangeType {
DIRECT, // 直连交换机
TOPIC, // 主题交换机
FANOUT, // 扇出交换机
HEADERS // 头交换机
}
private ExchangeType type;
private Map bindings = new HashMap<>();
public void route(String routingKey, Message message) {
// 根据路由规则路由消息
}
}
/**
* 绑定(Binding)
* - 交换机和队列之间的关联
* - 包含路由键
*/
public static class Binding {
private String queueName;
private String routingKey;
public Binding(String queueName, String routingKey) {
this.queueName = queueName;
this.routingKey = routingKey;
}
}
}
```
### 2.2 消息模型
```java
/**
* 消息模型
*/
public class MessageModels {
/**
* 1. 点对点模型(Point-to-Point)
* - 一个消息只能被一个消费者消费
* - 消息被消费后从队列中删除
*/
public static class PointToPointModel {
public void demo() {
// 生产者
Producer producer = new Producer();
producer.send("Hello World");
// 消费者1
Consumer consumer1 = new Consumer();
consumer1.receive(); // 收到消息
// 消费者2
Consumer consumer2 = new Consumer();
consumer2.receive(); // 收不到消息(已被消费)
}
}
/**
* 2. 发布订阅模型(Publish-Subscribe)
* - 一个消息可以被多个消费者消费
* - 每个消费者都能收到消息的副本
*/
public static class PublishSubscribeModel {
public void demo() {
// 生产者
Producer producer = new Producer();
producer.send("Hello World");
// 消费者1
Consumer consumer1 = new Consumer();
consumer1.receive(); // 收到消息
// 消费者2
Consumer consumer2 = new Consumer();
consumer2.receive(); // 也能收到消息
}
}
}
```
### 2.3 消息流转过程
```java
/**
* 消息流转过程
*/
public class MessageFlow {
/**
* 完整的消息流转过程
*/
public static class MessageLifecycle {
public void flow() {
// 1. 生产者创建消息
Message message = new Message();
message.setBody("Hello World");
message.setHeaders(Map.of("contentType", "text/plain"));
// 2. 生产者发送消息到交换机
Producer producer = new Producer();
producer.send("exchange1", "routing.key.1", message);
// 3. 交换机根据路由键和绑定关系路由消息
Exchange exchange = getExchange("exchange1");
List queues = exchange.route("routing.key.1", message);
// 4. 消息被投递到队列
for (Queue queue : queues) {
queue.enqueue(message);
}
// 5. 消费者从队列获取消息
Consumer consumer = new Consumer();
Message receivedMessage = consumer.receive("queue1");
// 6. 消费者处理消息
processMessage(receivedMessage);
// 7. 消费者确认消息
consumer.ack(receivedMessage);
// 8. 消息从队列中删除
}
}
}
```
---
## 3. 快速入门:RabbitMQ实战
### 3.1 RabbitMQ安装与配置
```bash
# Docker方式安装RabbitMQ
docker run -d \
--name rabbitmq \
-p 5672:5672 \
-p 15672:15672 \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=admin123 \
rabbitmq:3.12-management
# 访问管理界面
# http://localhost:15672
# 用户名: admin
# 密码: admin123
```
### 3.2 Spring Boot集成
```xml
org.springframework.boot
spring-boot-starter-amqp
```
```yaml
# application.yml
spring:
rabbitmq:
host: localhost
port: 5672
username: admin
password: admin123
virtual-host: /
# 消息确认配置
publisher-confirm-type: correlated # 发布确认
publisher-returns: true # 发布返回
listener:
simple:
acknowledge-mode: manual # 手动确认
prefetch: 1 # 公平分发
retry:
enabled: true
max-attempts: 3
```
### 3.3 第一个MQ程序
```java
/**
* RabbitMQ配置类
*/
@Configuration
public class RabbitMQConfig {
/**
* 声明队列
*/
@Bean
public Queue helloQueue() {
// durable: 是否持久化
// exclusive: 是否独占
// autoDelete: 是否自动删除
return new Queue("hello.queue", true, false, false);
}
/**
* 声明交换机
*/
@Bean
public DirectExchange helloExchange() {
return new DirectExchange("hello.exchange", true, false);
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding helloBinding() {
return BindingBuilder
.bind(helloQueue())
.to(helloExchange())
.with("hello.routing.key");
}
}
```
```java
/**
* 消息生产者
*/
@Service
public class MessageProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 发送简单消息
*/
public void sendHelloMessage() {
String message = "Hello RabbitMQ!";
rabbitTemplate.convertAndSend(
"hello.exchange",
"hello.routing.key",
message
);
System.out.println("发送消息: " + message);
}
/**
* 发送对象消息
*/
public void sendOrderMessage(Order order) {
rabbitTemplate.convertAndSend(
"order.exchange",
"order.created",
order
);
System.out.println("发送订单消息: " + order.getId());
}
}
```
```java
/**
* 消息消费者
*/
@Component
public class MessageConsumer {
/**
* 监听队列
*/
@RabbitListener(queues = "hello.queue")
public void receiveHelloMessage(String message) {
System.out.println("收到消息: " + message);
}
/**
* 监听订单队列
*/
@RabbitListener(queues = "order.queue")
public void receiveOrderMessage(Order order) {
System.out.println("收到订单: " + order.getId());
// 处理订单逻辑
}
}
```
### 3.4 测试控制器
```java
/**
* 测试控制器
*/
@RestController
@RequestMapping("/mq")
public class MQTestController {
@Autowired
private MessageProducer messageProducer;
/**
* 发送简单消息
*/
@GetMapping("/send")
public String sendMessage() {
messageProducer.sendHelloMessage();
return "消息发送成功";
}
/**
* 发送订单消息
*/
@PostMapping("/order")
public String createOrder(@RequestBody Order order) {
messageProducer.sendOrderMessage(order);
return "订单消息发送成功";
}
}
```
---
## 4. 项目实战一:电商订单系统
### 4.1 项目背景
构建一个完整的电商订单系统,实现下单后异步处理以下业务:
- 扣减库存
- 发送通知(短信、邮件)
- 更新积分
- 记录日志
### 4.2 项目结构
```
ecommerce-order-system/
├── src/main/java/
│ ├── com.example.order/
│ │ ├── config/ # 配置类
│ │ ├── controller/ # 控制器
│ │ ├── service/ # 服务层
│ │ ├── mq/ # MQ相关
│ │ │ ├── producer/ # 消息生产者
│ │ │ ├── consumer/ # 消息消费者
│ │ │ └── config/ # MQ配置
│ │ └── model/ # 数据模型
└── src/main/resources/
└── application.yml
```
### 4.3 数据模型
```java
/**
* 订单实体
*/
@Data
@Entity
@Table(name = "t_order")
public class Order {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private String orderNo;
private Long userId;
private Long productId;
private Integer quantity;
private BigDecimal amount;
private Integer status; // 0:待支付, 1:已支付, 2:已发货, 3:已完成
private LocalDateTime createTime;
private LocalDateTime updateTime;
}
```
```java
/**
* 库存实体
*/
@Data
@Entity
@Table(name = "t_inventory")
public class Inventory {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private Long productId;
private Integer stock;
private Integer version; // 乐观锁版本号
}
```
### 4.4 MQ配置
```java
/**
* RabbitMQ配置
*/
@Configuration
public class OrderMQConfig {
/**
* 订单交换机
*/
@Bean
public TopicExchange orderExchange() {
return new TopicExchange("order.exchange", true, false);
}
/**
* 库存队列
*/
@Bean
public Queue inventoryQueue() {
return new Queue("order.inventory.queue", true);
}
/**
* 通知队列
*/
@Bean
public Queue notificationQueue() {
return new Queue("order.notification.queue", true);
}
/**
* 积分队列
*/
@Bean
public Queue pointsQueue() {
return new Queue("order.points.queue", true);
}
/**
* 日志队列
*/
@Bean
public Queue logQueue() {
return new Queue("order.log.queue", true);
}
/**
* 绑定库存队列
*/
@Bean
public Binding inventoryBinding() {
return BindingBuilder
.bind(inventoryQueue())
.to(orderExchange())
.with("order.inventory.#");
}
/**
* 绑定通知队列
*/
@Bean
public Binding notificationBinding() {
return BindingBuilder
.bind(notificationQueue())
.to(orderExchange())
.with("order.notification.#");
}
/**
* 绑定积分队列
*/
@Bean
public Binding pointsBinding() {
return BindingBuilder
.bind(pointsQueue())
.to(orderExchange())
.with("order.points.#");
}
/**
* 绑定日志队列
*/
@Bean
public Binding logBinding() {
return BindingBuilder
.bind(logQueue())
.to(orderExchange())
.with("order.log.#");
}
}
```
### 4.5 订单服务
```java
/**
* 订单服务
*/
@Service
public class OrderService {
@Autowired
private OrderRepository orderRepository;
@Autowired
private OrderMQProducer orderMQProducer;
/**
* 创建订单
*/
@Transactional
public Order createOrder(OrderDTO orderDTO) {
// 1. 创建订单
Order order = new Order();
order.setOrderNo(generateOrderNo());
order.setUserId(orderDTO.getUserId());
order.setProductId(orderDTO.getProductId());
order.setQuantity(orderDTO.getQuantity());
order.setAmount(orderDTO.getAmount());
order.setStatus(0);
order.setCreateTime(LocalDateTime.now());
order.setUpdateTime(LocalDateTime.now());
order = orderRepository.save(order);
// 2. 发送订单创建消息
orderMQProducer.sendOrderCreated(order);
return order;
}
/**
* 支付订单
*/
@Transactional
public void payOrder(String orderNo) {
Order order = orderRepository.findByOrderNo(orderNo);
if (order == null) {
throw new BusinessException("订单不存在");
}
order.setStatus(1);
order.setUpdateTime(LocalDateTime.now());
orderRepository.save(order);
// 发送订单支付消息
orderMQProducer.sendOrderPaid(order);
}
/**
* 生成订单号
*/
private String generateOrderNo() {
return LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHHmmss"))
+ String.format("%04d", new Random().nextInt(10000));
}
}
```
### 4.6 消息生产者
```java
/**
* 订单消息生产者
*/
@Component
public class OrderMQProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 发送订单创建消息
*/
public void sendOrderCreated(Order order) {
// 发送到库存队列
rabbitTemplate.convertAndSend(
"order.exchange",
"order.inventory.created",
order
);
// 发送到通知队列
rabbitTemplate.convertAndSend(
"order.exchange",
"order.notification.created",
order
);
// 发送到日志队列
rabbitTemplate.convertAndSend(
"order.exchange",
"order.log.created",
order
);
System.out.println("订单创建消息已发送: " + order.getOrderNo());
}
/**
* 发送订单支付消息
*/
public void sendOrderPaid(Order order) {
// 发送到积分队列
rabbitTemplate.convertAndSend(
"order.exchange",
"order.points.paid",
order
);
// 发送到通知队列
rabbitTemplate.convertAndSend(
"order.exchange",
"order.notification.paid",
order
);
System.out.println("订单支付消息已发送: " + order.getOrderNo());
}
}
```
### 4.7 消息消费者
```java
/**
* 库存消费者
*/
@Component
@Slf4j
public class InventoryConsumer {
@Autowired
private InventoryService inventoryService;
/**
* 处理订单创建,扣减库存
*/
@RabbitListener(queues = "order.inventory.queue")
public void handleOrderCreated(Order order, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
try {
log.info("收到订单创建消息,开始扣减库存: {}", order.getOrderNo());
// 扣减库存
inventoryService.deductStock(order.getProductId(), order.getQuantity());
log.info("库存扣减成功: {}", order.getOrderNo());
// 手动确认消息
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
log.error("库存扣减失败: {}", order.getOrderNo(), e);
try {
// 拒绝消息,重新入队
channel.basicNack(deliveryTag, false, true);
} catch (IOException ex) {
log.error("消息拒绝失败", ex);
}
}
}
}
```
```java
/**
* 通知消费者
*/
@Component
@Slf4j
public class NotificationConsumer {
@Autowired
private NotificationService notificationService;
/**
* 处理订单创建,发送通知
*/
@RabbitListener(queues = "order.notification.queue")
public void handleOrderCreated(Order order) {
try {
log.info("收到订单创建消息,开始发送通知: {}", order.getOrderNo());
// 发送短信通知
notificationService.sendSms(order.getUserId(),
"您的订单" + order.getOrderNo() + "已创建成功");
// 发送邮件通知
notificationService.sendEmail(order.getUserId(),
"订单创建成功", "您的订单" + order.getOrderNo() + "已创建成功");
log.info("通知发送成功: {}", order.getOrderNo());
} catch (Exception e) {
log.error("通知发送失败: {}", order.getOrderNo(), e);
}
}
}
```
```java
/**
* 积分消费者
*/
@Component
@Slf4j
public class PointsConsumer {
@Autowired
private PointsService pointsService;
/**
* 处理订单支付,更新积分
*/
@RabbitListener(queues = "order.points.queue")
public void handleOrderPaid(Order order) {
try {
log.info("收到订单支付消息,开始更新积分: {}", order.getOrderNo());
// 计算积分(1元=1积分)
int points = order.getAmount().multiply(BigDecimal.TEN).intValue();
// 更新积分
pointsService.addPoints(order.getUserId(), points);
log.info("积分更新成功: {}, 积分: {}", order.getOrderNo(), points);
} catch (Exception e) {
log.error("积分更新失败: {}", order.getOrderNo(), e);
}
}
}
```
### 4.8 测试
```java
/**
* 订单测试
*/
@SpringBootTest
public class OrderServiceTest {
@Autowired
private OrderService orderService;
@Test
public void testCreateOrder() {
OrderDTO orderDTO = new OrderDTO();
orderDTO.setUserId(1L);
orderDTO.setProductId(100L);
orderDTO.setQuantity(1);
orderDTO.setAmount(new BigDecimal("99.99"));
Order order = orderService.createOrder(orderDTO);
System.out.println("订单创建成功: " + order.getOrderNo());
// 等待异步处理完成
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
```
---
## 5. 项目实战二:日志收集系统
### 5.1 项目背景
构建一个分布式日志收集系统,实现以下功能:
- 各服务异步发送日志到MQ
- 日志服务从MQ消费日志
- 日志存储到Elasticsearch
- 支持日志查询和分析
### 5.2 架构设计
```
应用服务 → RabbitMQ → 日志服务 → Elasticsearch
↓
死信队列(处理失败消息)
```
### 5.3 日志模型
```java
/**
* 日志实体
*/
@Data
public class LogMessage {
/**
* 应用名称
*/
private String appName;
/**
* 日志级别
*/
private String level;
/**
* 日志内容
*/
private String message;
/**
* 异常堆栈
*/
private String stackTrace;
/**
* 线程名称
*/
private String threadName;
/**
* 类名
*/
private String className;
/**
* 方法名
*/
private String methodName;
/**
* 时间戳
*/
private Long timestamp;
/**
* TraceId(链路追踪)
*/
private String traceId;
/**
* 自定义字段
*/
private Map fields;
}
```
### 5.4 MQ配置
```java
/**
* 日志MQ配置
*/
@Configuration
public class LogMQConfig {
/**
* 日志交换机
*/
@Bean
public TopicExchange logExchange() {
return new TopicExchange("log.exchange", true, false);
}
/**
* 日志队列
*/
@Bean
public Queue logQueue() {
return QueueBuilder
.durable("log.queue")
// 绑定死信交换机
.withArgument("x-dead-letter-exchange", "log.dlx.exchange")
.withArgument("x-dead-letter-routing-key", "log.dlx")
// 设置消息TTL(24小时)
.withArgument("x-message-ttl", 86400000)
.build();
}
/**
* 死信交换机
*/
@Bean
public DirectExchange deadLetterExchange() {
return new DirectExchange("log.dlx.exchange", true, false);
}
/**
* 死信队列
*/
@Bean
public Queue deadLetterQueue() {
return new Queue("log.dlx.queue", true);
}
/**
* 绑定日志队列
*/
@Bean
public Binding logBinding() {
return BindingBuilder
.bind(logQueue())
.to(logExchange())
.with("log.#");
}
/**
* 绑定死信队列
*/
@Bean
public Binding deadLetterBinding() {
return BindingBuilder
.bind(deadLetterQueue())
.to(deadLetterExchange())
.with("log.dlx");
}
}
```
### 5.5 日志生产者
```java
/**
* 日志生产者
*/
@Component
@Slf4j
public class LogProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 发送日志消息
*/
public void sendLog(LogMessage logMessage) {
try {
// 设置消息ID,用于追踪
Message message = MessageBuilder
.withBody(JSON.toJSONString(logMessage).getBytes())
.setContentType(MessageProperties.CONTENT_TYPE_JSON)
.setHeader("x-app-name", logMessage.getAppName())
.setHeader("x-log-level", logMessage.getLevel())
.build();
rabbitTemplate.send(
"log.exchange",
"log." + logMessage.getAppName(),
message
);
} catch (Exception e) {
log.error("发送日志失败", e);
}
}
/**
* 发送错误日志
*/
public void sendErrorLog(String appName, String message, Throwable throwable) {
LogMessage logMessage = new LogMessage();
logMessage.setAppName(appName);
logMessage.setLevel("ERROR");
logMessage.setMessage(message);
logMessage.setStackTrace(getStackTrace(throwable));
logMessage.setThreadName(Thread.currentThread().getName());
logMessage.setTimestamp(System.currentTimeMillis());
sendLog(logMessage);
}
private String getStackTrace(Throwable throwable) {
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
throwable.printStackTrace(pw);
return sw.toString();
}
}
```
### 5.6 日志消费者
```java
/**
* 日志消费者
*/
@Component
@Slf4j
public class LogConsumer {
@Autowired
private ElasticsearchService elasticsearchService;
@Autowired
private LogRepository logRepository;
/**
* 消费日志消息
*/
@RabbitListener(queues = "log.queue", containerFactory = "logListenerContainerFactory")
public void handleLog(Message message, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
try {
String json = new String(message.getBody());
LogMessage logMessage = JSON.parseObject(json, LogMessage.class);
log.info("收到日志消息: {} - {}", logMessage.getAppName(), logMessage.getMessage());
// 存储到数据库
LogEntity logEntity = convertToEntity(logMessage);
logRepository.save(logEntity);
// 存储到Elasticsearch
elasticsearchService.indexLog(logMessage);
// 手动确认
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
log.error("处理日志消息失败", e);
try {
// 拒绝消息,不重新入队(避免重复处理)
channel.basicNack(deliveryTag, false, false);
} catch (IOException ex) {
log.error("消息拒绝失败", ex);
}
}
}
private LogEntity convertToEntity(LogMessage logMessage) {
LogEntity entity = new LogEntity();
entity.setAppName(logMessage.getAppName());
entity.setLevel(logMessage.getLevel());
entity.setMessage(logMessage.getMessage());
entity.setStackTrace(logMessage.getStackTrace());
entity.setThreadName(logMessage.getThreadName());
entity.setClassName(logMessage.getClassName());
entity.setMethodName(logMessage.getMethodName());
entity.setTimestamp(new Date(logMessage.getTimestamp()));
return entity;
}
}
```
### 5.7 消费者工厂配置
```java
/**
* 日志消费者工厂配置
*/
@Configuration
public class LogListenerContainerFactory {
@Bean
public RabbitListenerContainerFactory> logListenerContainerFactory(
ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
// 设置手动确认
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
// 设置并发消费者
factory.setConcurrentConsumers(5);
factory.setMaxConcurrentConsumers(10);
// 设置预取数量
factory.setPrefetch(1);
// 设置重试
factory.setRetryStateBuilder(
new RetryInterceptorBuilder.StatelessRetryInterceptorBuilder()
.maxAttempts(3)
.backOffOptions(1000, 2, 10000)
.build()
);
return factory;
}
}
```
### 5.8 应用集成
```java
/**
* 自定义日志Appender
*/
public class RabbitMQAppender extends UnsynchronizedAppenderBase {
@Autowired
private LogProducer logProducer;
@Override
protected void append(ILoggingEvent event) {
try {
LogMessage logMessage = new LogMessage();
logMessage.setAppName(getAppName());
logMessage.setLevel(event.getLevel().toString());
logMessage.setMessage(event.getFormattedMessage());
logMessage.setThreadName(event.getThreadName());
logMessage.setClassName(event.getLoggerName());
logMessage.setTimestamp(System.currentTimeMillis());
// 获取MDC中的traceId
logMessage.setTraceId(MDC.get("traceId"));
logProducer.sendLog(logMessage);
} catch (Exception e) {
// 日志发送失败不影响业务
System.err.println("发送日志到MQ失败: " + e.getMessage());
}
}
private String getAppName() {
return "ecommerce-order";
}
}
```
---
## 6. 项目实战三:分布式事务处理
### 6.1 项目背景
实现一个分布式事务场景:用户下单后,需要:
1. 创建订单(订单服务)
2. 扣减库存(库存服务)
3. 扣减余额(用户服务)
这三个操作必须保证一致性,要么全部成功,要么全部失败。
### 6.2 最终一致性方案
使用消息队列实现最终一致性:
```
订单服务 → MQ → 库存服务
↓
用户服务
```
### 6.3 事务消息发送
```java
/**
* 事务消息生产者
*/
@Component
@Slf4j
public class TransactionalMessageProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 发送事务消息
*/
public void sendTransactionalMessage(Message message) {
// 使用事务模板发送消息
rabbitTemplate.execute(channel -> {
try {
// 开启事务
channel.txSelect();
// 发送消息
rabbitTemplate.send(message);
// 提交事务
channel.txCommit();
log.info("事务消息发送成功");
} catch (Exception e) {
// 回滚事务
try {
channel.txRollback();
} catch (IOException ex) {
log.error("事务回滚失败", ex);
}
throw new RuntimeException("发送事务消息失败", e);
}
return null;
});
}
}
```
### 6.4 本地消息表方案
```java
/**
* 本地消息表
*/
@Data
@Entity
@Table(name = "t_local_message")
public class LocalMessage {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
/**
* 业务ID
*/
private String businessId;
/**
* 消息内容
*/
@Column(columnDefinition = "TEXT")
private String message;
/**
* 交换机
*/
private String exchange;
/**
* 路由键
*/
private String routingKey;
/**
* 状态:0-待发送,1-已发送,2-发送失败
*/
private Integer status;
/**
* 重试次数
*/
private Integer retryCount;
/**
* 创建时间
*/
private LocalDateTime createTime;
/**
* 更新时间
*/
private LocalDateTime updateTime;
}
```
```java
/**
* 本地消息服务
*/
@Service
@Slf4j
public class LocalMessageService {
@Autowired
private LocalMessageRepository localMessageRepository;
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 保存本地消息
*/
@Transactional
public void saveMessage(String businessId, Object message,
String exchange, String routingKey) {
LocalMessage localMessage = new LocalMessage();
localMessage.setBusinessId(businessId);
localMessage.setMessage(JSON.toJSONString(message));
localMessage.setExchange(exchange);
localMessage.setRoutingKey(routingKey);
localMessage.setStatus(0);
localMessage.setRetryCount(0);
localMessage.setCreateTime(LocalDateTime.now());
localMessage.setUpdateTime(LocalDateTime.now());
localMessageRepository.save(localMessage);
}
/**
* 发送消息
*/
public void sendMessage(LocalMessage localMessage) {
try {
rabbitTemplate.convertAndSend(
localMessage.getExchange(),
localMessage.getRoutingKey(),
JSON.parseObject(localMessage.getMessage())
);
// 更新状态为已发送
localMessage.setStatus(1);
localMessage.setUpdateTime(LocalDateTime.now());
localMessageRepository.save(localMessage);
log.info("消息发送成功: {}", localMessage.getBusinessId());
} catch (Exception e) {
log.error("消息发送失败: {}", localMessage.getBusinessId(), e);
// 更新状态为发送失败
localMessage.setStatus(2);
localMessage.setRetryCount(localMessage.getRetryCount() + 1);
localMessage.setUpdateTime(LocalDateTime.now());
localMessageRepository.save(localMessage);
}
}
}
```
```java
/**
* 定时任务:重试失败消息
*/
@Component
@Slf4j
public class MessageRetryTask {
@Autowired
private LocalMessageRepository localMessageRepository;
@Autowired
private LocalMessageService localMessageService;
/**
* 每分钟重试一次失败消息
*/
@Scheduled(fixedRate = 60000)
public void retryFailedMessages() {
// 查询失败且重试次数小于3的消息
List messages = localMessageRepository
.findByStatusAndRetryCountLessThan(2, 3);
for (LocalMessage message : messages) {
try {
localMessageService.sendMessage(message);
} catch (Exception e) {
log.error("重试消息失败: {}", message.getBusinessId(), e);
}
}
}
}
```
### 6.5 事务消息示例
```java
/**
* 订单服务(使用本地消息表)
*/
@Service
@Slf4j
public class OrderServiceWithLocalMessage {
@Autowired
private OrderRepository orderRepository;
@Autowired
private LocalMessageService localMessageService;
/**
* 创建订单
*/
@Transactional
public Order createOrder(OrderDTO orderDTO) {
// 1. 创建订单
Order order = new Order();
order.setOrderNo(generateOrderNo());
order.setUserId(orderDTO.getUserId());
order.setProductId(orderDTO.getProductId());
order.setQuantity(orderDTO.getQuantity());
order.setAmount(orderDTO.getAmount());
order.setStatus(0);
order.setCreateTime(LocalDateTime.now());
order = orderRepository.save(order);
// 2. 保存本地消息(与订单在同一事务中)
localMessageService.saveMessage(
order.getOrderNo(),
order,
"order.exchange",
"order.inventory.created"
);
return order;
}
}
```
```java
/**
* 消息发送服务
*/
@Service
@Slf4j
public class MessageSenderService {
@Autowired
private LocalMessageRepository localMessageRepository;
@Autowired
private LocalMessageService localMessageService;
/**
* 发送待发送的消息
*/
@Scheduled(fixedDelay = 1000)
public void sendPendingMessages() {
// 查询待发送的消息
List messages = localMessageRepository.findByStatus(0);
for (LocalMessage message : messages) {
try {
localMessageService.sendMessage(message);
} catch (Exception e) {
log.error("发送消息失败: {}", message.getBusinessId(), e);
}
}
}
}
```
---
## 7. 高阶应用:消息可靠性保证
### 7.1 生产者可靠性
```java
/**
* 生产者可靠性配置
*/
@Configuration
public class ProducerReliabilityConfig {
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
// 开启发送确认
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
log.info("消息发送确认成功");
} else {
log.error("消息发送确认失败: {}", cause);
// 可以在这里进行重试
}
});
// 开启返回确认
rabbitTemplate.setReturnsCallback(returned -> {
log.error("消息无法路由: exchange={}, routingKey={}",
returned.getExchange(), returned.getRoutingKey());
// 可以在这里进行重试
});
// 开启强制返回
rabbitTemplate.setMandatory(true);
return rabbitTemplate;
}
}
```
### 7.2 消费者可靠性
```java
/**
* 消费者可靠性配置
*/
@Configuration
public class ConsumerReliabilityConfig {
@Bean
public RabbitListenerContainerFactory> rabbitListenerContainerFactory(
ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory =
new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
// 手动确认
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
// 重试配置
factory.setRetryStateBuilder(
new RetryInterceptorBuilder.StatelessRetryInterceptorBuilder()
.maxAttempts(3)
.backOffOptions(1000, 2, 10000)
.recoverer(new MessageRecoverer() {
@Override
public void recover(Message message, Throwable cause) {
// 重试失败后的处理
log.error("消息处理失败,进入死信队列: {}", message);
}
})
.build()
);
return factory;
}
}
```
### 7.3 消息幂等性
```java
/**
* 幂等性消费者
*/
@Component
@Slf4j
public class IdempotentConsumer {
@Autowired
private RedisTemplate redisTemplate;
/**
* 处理消息(保证幂等性)
*/
@RabbitListener(queues = "order.queue")
public void handleMessage(Message message, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
try {
String messageId = message.getMessageProperties().getMessageId();
// 检查是否已处理
if (isProcessed(messageId)) {
log.info("消息已处理,跳过: {}", messageId);
channel.basicAck(deliveryTag, false);
return;
}
// 处理业务逻辑
processBusiness(message);
// 标记为已处理
markAsProcessed(messageId);
// 确认消息
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
log.error("处理消息失败", e);
try {
channel.basicNack(deliveryTag, false, true);
} catch (IOException ex) {
log.error("消息拒绝失败", ex);
}
}
}
private boolean isProcessed(String messageId) {
String key = "message:processed:" + messageId;
return Boolean.TRUE.equals(redisTemplate.hasKey(key));
}
private void markAsProcessed(String messageId) {
String key = "message:processed:" + messageId;
// 标记7天
redisTemplate.opsForValue().set(key, "1", 7, TimeUnit.DAYS);
}
private void processBusiness(Message message) {
// 处理业务逻辑
}
}
```
### 7.4 消息顺序性
```java
/**
* 顺序消息消费者
*/
@Component
@Slf4j
public class OrderedConsumer {
/**
* 单一消费者保证顺序
*/
@RabbitListener(queues = "order.queue", containerFactory = "singleListenerContainerFactory")
public void handleMessage(Order order) {
log.info("处理订单: {}", order.getOrderNo());
// 处理订单
}
/**
* 单一消费者工厂
*/
@Bean
public RabbitListenerContainerFactory> singleListenerContainerFactory(
ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory =
new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setConcurrentConsumers(1); // 只有一个消费者
factory.setPrefetch(1);
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return factory;
}
}
```
---
## 8. 性能优化与监控
### 8.1 性能优化配置
```java
/**
* 性能优化配置
*/
@Configuration
public class PerformanceConfig {
@Bean
public RabbitListenerContainerFactory> performanceContainerFactory(
ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory =
new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
// 增加并发消费者
factory.setConcurrentConsumers(10);
factory.setMaxConcurrentConsumers(20);
// 增加预取数量
factory.setPrefetch(10);
// 批量确认
factory.setBatchListener(true);
factory.setBatchSize(100);
// 消费者线程池
factory.setTaskExecutor(new ThreadPoolTaskExecutor() {{
setCorePoolSize(20);
setMaxPoolSize(50);
setQueueCapacity(100);
setThreadNamePrefix("rabbit-consumer-");
}});
return factory;
}
}
```
### 8.2 监控指标
```java
/**
* MQ监控服务
*/
@Service
@Slf4j
public class MQMonitorService {
@Autowired
private RabbitAdmin rabbitAdmin;
/**
* 获取队列监控信息
*/
public Map getQueueMetrics(String queueName) {
Map metrics = new HashMap<>();
try {
// 获取队列信息
QueueInfo queueInfo = rabbitAdmin.getQueueInfo(queueName);
metrics.put("queueName", queueName.getName());
metrics.put("messageCount", queueInfo.getMessageCount());
metrics.put("consumerCount", queueInfo.getConsumerCount());
// 计算消息堆积率
long messageCount = queueInfo.getMessageCount();
long consumerCount = queueInfo.getConsumerCount();
double backlogRate = consumerCount > 0 ?
(double) messageCount / consumerCount : messageCount;
metrics.put("backlogRate", backlogRate);
} catch (Exception e) {
log.error("获取队列监控信息失败: {}", queueName, e);
}
return metrics;
}
/**
* 获取所有队列监控信息
*/
public List