【消息队列】【从入门到可以】

管理员
# 消息队列深度实战:从入门到精通 ## 目录 1. [MQ入门:为什么需要消息队列](#1-mq入门为什么需要消息队列) 2. [MQ基础概念与核心原理](#2-mq基础概念与核心原理) 3. [快速入门:RabbitMQ实战](#3-快速入门rabbitmq实战) 4. [项目实战一:电商订单系统](#4-项目实战一电商订单系统) 5. [项目实战二:日志收集系统](#5-项目实战二日志收集系统) 6. [项目实战三:分布式事务处理](#6-项目实战三分布式事务处理) 7. [高阶应用:消息可靠性保证](#7-高阶应用消息可靠性保证) 8. [性能优化与监控](#8-性能优化与监控) 9. [多MQ对比与选型](#9-多mq对比与选型) 10. [总结与最佳实践](#10-总结与最佳实践) --- ## 1. MQ入门:为什么需要消息队列 ### 1.1 一个真实的业务场景 想象一个电商系统的下单流程: ``` 用户下单 → 创建订单 → 扣减库存 → 发送通知 → 更新积分 → 记录日志 ``` #### 场景1:同步调用的问题 ```java /** * 传统同步调用方式 * 问题:每个环节都依赖前一个环节,一个环节失败,整个流程失败 */ public class OrderServiceSync { public void placeOrder(Order order) { // 1. 创建订单 orderDao.save(order); // 2. 扣减库存(可能因为网络原因超时) inventoryService.deduct(order.getProductId(), order.getQuantity()); // 3. 发送通知(短信服务可能宕机) notificationService.send(order.getUserId(), "订单创建成功"); // 4. 更新积分(积分系统可能维护中) pointsService.add(order.getUserId(), order.getAmount()); // 5. 记录日志 logService.record(order); } } ``` **问题分析:** - 耦合度高:订单系统强依赖库存、通知、积分等系统 - 性能差:总响应时间 = 所有服务响应时间之和 - 可用性低:任何一环失败,整个流程失败 - 扩展性差:难以独立扩展各个服务 #### 场景2:使用MQ后的改进 ```java /** * 使用MQ异步解耦 */ public class OrderServiceAsync { @Autowired private RabbitTemplate rabbitTemplate; public void placeOrder(Order order) { // 1. 创建订单(核心业务,必须同步) orderDao.save(order); // 2. 发送消息到MQ(异步处理) rabbitTemplate.convertAndSend("order.exchange", "order.created", order); // 立即返回,后续处理异步完成 } } /** * 库存服务监听消息 */ @RabbitListener(queues = "order.inventory.queue") public class InventoryConsumer { public void handleOrderCreated(Order order) { // 异步扣减库存 inventoryService.deduct(order.getProductId(), order.getQuantity()); } } /** * 通知服务监听消息 */ @RabbitListener(queues = "order.notification.queue") public class NotificationConsumer { public void handleOrderCreated(Order order) { // 异步发送通知 notificationService.send(order.getUserId(), "订单创建成功"); } } ``` **改进效果:** - 解耦:订单系统不依赖其他系统 - 性能提升:主流程响应时间大幅减少 - 可用性提高:某个服务故障不影响订单创建 - 易于扩展:可以独立扩展消费者 ### 1.2 MQ的核心价值 ```java /** * MQ核心价值演示 */ public class MQValueDemo { /** * 1. 异步处理 * 场景:用户注册后发送欢迎邮件、初始化数据、赠送积分 */ public void asyncProcessing() { // 传统方式:用户需要等待所有操作完成 // 注册(100ms) + 发送邮件(200ms) + 初始化数据(150ms) + 赠送积分(100ms) = 550ms // MQ方式:用户只需等待注册完成 // 注册(100ms) = 100ms // 其他操作异步执行,不影响用户体验 } /** * 2. 应用解耦 * 场景:订单系统需要通知多个下游系统 */ public void decoupling() { // 传统方式:订单系统需要知道所有下游系统的接口 // orderService.notify(inventoryService, paymentService, logisticsService, ...) // MQ方式:订单系统只需发送消息 // 不需要知道有哪些消费者,也不关心消费者如何处理 } /** * 3. 流量削峰 * 场景:秒杀系统,瞬时流量巨大 */ public void peakShaving() { // 传统方式:瞬时100万请求直接打到数据库,数据库崩溃 // 数据库QPS: 5000 // MQ方式:请求先进入MQ,消费者按自己的能力处理 // MQ可以承受100万请求/秒 // 数据库QPS: 5000(由消费者控制) } } ``` --- ## 2. MQ基础概念与核心原理 ### 2.1 核心概念 ```java /** * MQ核心概念 */ public class MQCoreConcepts { /** * 生产者(Producer) * - 消息的发送方 * - 负责创建和发送消息 */ public static class Producer { public void send(String message) { // 发送消息到MQ } } /** * 消费者(Consumer) * - 消息的接收方 * - 负责处理消息 */ public static class Consumer { @RabbitListener(queues = "test.queue") public void receive(String message) { // 处理消息 } } /** * 队列(Queue) * - 消息的容器 * - 先进先出(FIFO) */ public static class Queue { private List messages = new LinkedList<>(); public void enqueue(Message message) { messages.add(message); } public Message dequeue() { return messages.isEmpty() ? null : messages.remove(0); } } /** * 交换机(Exchange) * - 消息路由器 * - 根据路由规则将消息发送到队列 */ public static class Exchange { public enum ExchangeType { DIRECT, // 直连交换机 TOPIC, // 主题交换机 FANOUT, // 扇出交换机 HEADERS // 头交换机 } private ExchangeType type; private Map bindings = new HashMap<>(); public void route(String routingKey, Message message) { // 根据路由规则路由消息 } } /** * 绑定(Binding) * - 交换机和队列之间的关联 * - 包含路由键 */ public static class Binding { private String queueName; private String routingKey; public Binding(String queueName, String routingKey) { this.queueName = queueName; this.routingKey = routingKey; } } } ``` ### 2.2 消息模型 ```java /** * 消息模型 */ public class MessageModels { /** * 1. 点对点模型(Point-to-Point) * - 一个消息只能被一个消费者消费 * - 消息被消费后从队列中删除 */ public static class PointToPointModel { public void demo() { // 生产者 Producer producer = new Producer(); producer.send("Hello World"); // 消费者1 Consumer consumer1 = new Consumer(); consumer1.receive(); // 收到消息 // 消费者2 Consumer consumer2 = new Consumer(); consumer2.receive(); // 收不到消息(已被消费) } } /** * 2. 发布订阅模型(Publish-Subscribe) * - 一个消息可以被多个消费者消费 * - 每个消费者都能收到消息的副本 */ public static class PublishSubscribeModel { public void demo() { // 生产者 Producer producer = new Producer(); producer.send("Hello World"); // 消费者1 Consumer consumer1 = new Consumer(); consumer1.receive(); // 收到消息 // 消费者2 Consumer consumer2 = new Consumer(); consumer2.receive(); // 也能收到消息 } } } ``` ### 2.3 消息流转过程 ```java /** * 消息流转过程 */ public class MessageFlow { /** * 完整的消息流转过程 */ public static class MessageLifecycle { public void flow() { // 1. 生产者创建消息 Message message = new Message(); message.setBody("Hello World"); message.setHeaders(Map.of("contentType", "text/plain")); // 2. 生产者发送消息到交换机 Producer producer = new Producer(); producer.send("exchange1", "routing.key.1", message); // 3. 交换机根据路由键和绑定关系路由消息 Exchange exchange = getExchange("exchange1"); List queues = exchange.route("routing.key.1", message); // 4. 消息被投递到队列 for (Queue queue : queues) { queue.enqueue(message); } // 5. 消费者从队列获取消息 Consumer consumer = new Consumer(); Message receivedMessage = consumer.receive("queue1"); // 6. 消费者处理消息 processMessage(receivedMessage); // 7. 消费者确认消息 consumer.ack(receivedMessage); // 8. 消息从队列中删除 } } } ``` --- ## 3. 快速入门:RabbitMQ实战 ### 3.1 RabbitMQ安装与配置 ```bash # Docker方式安装RabbitMQ docker run -d \ --name rabbitmq \ -p 5672:5672 \ -p 15672:15672 \ -e RABBITMQ_DEFAULT_USER=admin \ -e RABBITMQ_DEFAULT_PASS=admin123 \ rabbitmq:3.12-management # 访问管理界面 # http://localhost:15672 # 用户名: admin # 密码: admin123 ``` ### 3.2 Spring Boot集成 ```xml org.springframework.boot spring-boot-starter-amqp ``` ```yaml # application.yml spring: rabbitmq: host: localhost port: 5672 username: admin password: admin123 virtual-host: / # 消息确认配置 publisher-confirm-type: correlated # 发布确认 publisher-returns: true # 发布返回 listener: simple: acknowledge-mode: manual # 手动确认 prefetch: 1 # 公平分发 retry: enabled: true max-attempts: 3 ``` ### 3.3 第一个MQ程序 ```java /** * RabbitMQ配置类 */ @Configuration public class RabbitMQConfig { /** * 声明队列 */ @Bean public Queue helloQueue() { // durable: 是否持久化 // exclusive: 是否独占 // autoDelete: 是否自动删除 return new Queue("hello.queue", true, false, false); } /** * 声明交换机 */ @Bean public DirectExchange helloExchange() { return new DirectExchange("hello.exchange", true, false); } /** * 绑定队列和交换机 */ @Bean public Binding helloBinding() { return BindingBuilder .bind(helloQueue()) .to(helloExchange()) .with("hello.routing.key"); } } ``` ```java /** * 消息生产者 */ @Service public class MessageProducer { @Autowired private RabbitTemplate rabbitTemplate; /** * 发送简单消息 */ public void sendHelloMessage() { String message = "Hello RabbitMQ!"; rabbitTemplate.convertAndSend( "hello.exchange", "hello.routing.key", message ); System.out.println("发送消息: " + message); } /** * 发送对象消息 */ public void sendOrderMessage(Order order) { rabbitTemplate.convertAndSend( "order.exchange", "order.created", order ); System.out.println("发送订单消息: " + order.getId()); } } ``` ```java /** * 消息消费者 */ @Component public class MessageConsumer { /** * 监听队列 */ @RabbitListener(queues = "hello.queue") public void receiveHelloMessage(String message) { System.out.println("收到消息: " + message); } /** * 监听订单队列 */ @RabbitListener(queues = "order.queue") public void receiveOrderMessage(Order order) { System.out.println("收到订单: " + order.getId()); // 处理订单逻辑 } } ``` ### 3.4 测试控制器 ```java /** * 测试控制器 */ @RestController @RequestMapping("/mq") public class MQTestController { @Autowired private MessageProducer messageProducer; /** * 发送简单消息 */ @GetMapping("/send") public String sendMessage() { messageProducer.sendHelloMessage(); return "消息发送成功"; } /** * 发送订单消息 */ @PostMapping("/order") public String createOrder(@RequestBody Order order) { messageProducer.sendOrderMessage(order); return "订单消息发送成功"; } } ``` --- ## 4. 项目实战一:电商订单系统 ### 4.1 项目背景 构建一个完整的电商订单系统,实现下单后异步处理以下业务: - 扣减库存 - 发送通知(短信、邮件) - 更新积分 - 记录日志 ### 4.2 项目结构 ``` ecommerce-order-system/ ├── src/main/java/ │ ├── com.example.order/ │ │ ├── config/ # 配置类 │ │ ├── controller/ # 控制器 │ │ ├── service/ # 服务层 │ │ ├── mq/ # MQ相关 │ │ │ ├── producer/ # 消息生产者 │ │ │ ├── consumer/ # 消息消费者 │ │ │ └── config/ # MQ配置 │ │ └── model/ # 数据模型 └── src/main/resources/ └── application.yml ``` ### 4.3 数据模型 ```java /** * 订单实体 */ @Data @Entity @Table(name = "t_order") public class Order { @Id @GeneratedValue(strategy = GenerationType.IDENTITY) private Long id; private String orderNo; private Long userId; private Long productId; private Integer quantity; private BigDecimal amount; private Integer status; // 0:待支付, 1:已支付, 2:已发货, 3:已完成 private LocalDateTime createTime; private LocalDateTime updateTime; } ``` ```java /** * 库存实体 */ @Data @Entity @Table(name = "t_inventory") public class Inventory { @Id @GeneratedValue(strategy = GenerationType.IDENTITY) private Long id; private Long productId; private Integer stock; private Integer version; // 乐观锁版本号 } ``` ### 4.4 MQ配置 ```java /** * RabbitMQ配置 */ @Configuration public class OrderMQConfig { /** * 订单交换机 */ @Bean public TopicExchange orderExchange() { return new TopicExchange("order.exchange", true, false); } /** * 库存队列 */ @Bean public Queue inventoryQueue() { return new Queue("order.inventory.queue", true); } /** * 通知队列 */ @Bean public Queue notificationQueue() { return new Queue("order.notification.queue", true); } /** * 积分队列 */ @Bean public Queue pointsQueue() { return new Queue("order.points.queue", true); } /** * 日志队列 */ @Bean public Queue logQueue() { return new Queue("order.log.queue", true); } /** * 绑定库存队列 */ @Bean public Binding inventoryBinding() { return BindingBuilder .bind(inventoryQueue()) .to(orderExchange()) .with("order.inventory.#"); } /** * 绑定通知队列 */ @Bean public Binding notificationBinding() { return BindingBuilder .bind(notificationQueue()) .to(orderExchange()) .with("order.notification.#"); } /** * 绑定积分队列 */ @Bean public Binding pointsBinding() { return BindingBuilder .bind(pointsQueue()) .to(orderExchange()) .with("order.points.#"); } /** * 绑定日志队列 */ @Bean public Binding logBinding() { return BindingBuilder .bind(logQueue()) .to(orderExchange()) .with("order.log.#"); } } ``` ### 4.5 订单服务 ```java /** * 订单服务 */ @Service public class OrderService { @Autowired private OrderRepository orderRepository; @Autowired private OrderMQProducer orderMQProducer; /** * 创建订单 */ @Transactional public Order createOrder(OrderDTO orderDTO) { // 1. 创建订单 Order order = new Order(); order.setOrderNo(generateOrderNo()); order.setUserId(orderDTO.getUserId()); order.setProductId(orderDTO.getProductId()); order.setQuantity(orderDTO.getQuantity()); order.setAmount(orderDTO.getAmount()); order.setStatus(0); order.setCreateTime(LocalDateTime.now()); order.setUpdateTime(LocalDateTime.now()); order = orderRepository.save(order); // 2. 发送订单创建消息 orderMQProducer.sendOrderCreated(order); return order; } /** * 支付订单 */ @Transactional public void payOrder(String orderNo) { Order order = orderRepository.findByOrderNo(orderNo); if (order == null) { throw new BusinessException("订单不存在"); } order.setStatus(1); order.setUpdateTime(LocalDateTime.now()); orderRepository.save(order); // 发送订单支付消息 orderMQProducer.sendOrderPaid(order); } /** * 生成订单号 */ private String generateOrderNo() { return LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHHmmss")) + String.format("%04d", new Random().nextInt(10000)); } } ``` ### 4.6 消息生产者 ```java /** * 订单消息生产者 */ @Component public class OrderMQProducer { @Autowired private RabbitTemplate rabbitTemplate; /** * 发送订单创建消息 */ public void sendOrderCreated(Order order) { // 发送到库存队列 rabbitTemplate.convertAndSend( "order.exchange", "order.inventory.created", order ); // 发送到通知队列 rabbitTemplate.convertAndSend( "order.exchange", "order.notification.created", order ); // 发送到日志队列 rabbitTemplate.convertAndSend( "order.exchange", "order.log.created", order ); System.out.println("订单创建消息已发送: " + order.getOrderNo()); } /** * 发送订单支付消息 */ public void sendOrderPaid(Order order) { // 发送到积分队列 rabbitTemplate.convertAndSend( "order.exchange", "order.points.paid", order ); // 发送到通知队列 rabbitTemplate.convertAndSend( "order.exchange", "order.notification.paid", order ); System.out.println("订单支付消息已发送: " + order.getOrderNo()); } } ``` ### 4.7 消息消费者 ```java /** * 库存消费者 */ @Component @Slf4j public class InventoryConsumer { @Autowired private InventoryService inventoryService; /** * 处理订单创建,扣减库存 */ @RabbitListener(queues = "order.inventory.queue") public void handleOrderCreated(Order order, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) { try { log.info("收到订单创建消息,开始扣减库存: {}", order.getOrderNo()); // 扣减库存 inventoryService.deductStock(order.getProductId(), order.getQuantity()); log.info("库存扣减成功: {}", order.getOrderNo()); // 手动确认消息 channel.basicAck(deliveryTag, false); } catch (Exception e) { log.error("库存扣减失败: {}", order.getOrderNo(), e); try { // 拒绝消息,重新入队 channel.basicNack(deliveryTag, false, true); } catch (IOException ex) { log.error("消息拒绝失败", ex); } } } } ``` ```java /** * 通知消费者 */ @Component @Slf4j public class NotificationConsumer { @Autowired private NotificationService notificationService; /** * 处理订单创建,发送通知 */ @RabbitListener(queues = "order.notification.queue") public void handleOrderCreated(Order order) { try { log.info("收到订单创建消息,开始发送通知: {}", order.getOrderNo()); // 发送短信通知 notificationService.sendSms(order.getUserId(), "您的订单" + order.getOrderNo() + "已创建成功"); // 发送邮件通知 notificationService.sendEmail(order.getUserId(), "订单创建成功", "您的订单" + order.getOrderNo() + "已创建成功"); log.info("通知发送成功: {}", order.getOrderNo()); } catch (Exception e) { log.error("通知发送失败: {}", order.getOrderNo(), e); } } } ``` ```java /** * 积分消费者 */ @Component @Slf4j public class PointsConsumer { @Autowired private PointsService pointsService; /** * 处理订单支付,更新积分 */ @RabbitListener(queues = "order.points.queue") public void handleOrderPaid(Order order) { try { log.info("收到订单支付消息,开始更新积分: {}", order.getOrderNo()); // 计算积分(1元=1积分) int points = order.getAmount().multiply(BigDecimal.TEN).intValue(); // 更新积分 pointsService.addPoints(order.getUserId(), points); log.info("积分更新成功: {}, 积分: {}", order.getOrderNo(), points); } catch (Exception e) { log.error("积分更新失败: {}", order.getOrderNo(), e); } } } ``` ### 4.8 测试 ```java /** * 订单测试 */ @SpringBootTest public class OrderServiceTest { @Autowired private OrderService orderService; @Test public void testCreateOrder() { OrderDTO orderDTO = new OrderDTO(); orderDTO.setUserId(1L); orderDTO.setProductId(100L); orderDTO.setQuantity(1); orderDTO.setAmount(new BigDecimal("99.99")); Order order = orderService.createOrder(orderDTO); System.out.println("订单创建成功: " + order.getOrderNo()); // 等待异步处理完成 try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } } } ``` --- ## 5. 项目实战二:日志收集系统 ### 5.1 项目背景 构建一个分布式日志收集系统,实现以下功能: - 各服务异步发送日志到MQ - 日志服务从MQ消费日志 - 日志存储到Elasticsearch - 支持日志查询和分析 ### 5.2 架构设计 ``` 应用服务 → RabbitMQ → 日志服务 → Elasticsearch ↓ 死信队列(处理失败消息) ``` ### 5.3 日志模型 ```java /** * 日志实体 */ @Data public class LogMessage { /** * 应用名称 */ private String appName; /** * 日志级别 */ private String level; /** * 日志内容 */ private String message; /** * 异常堆栈 */ private String stackTrace; /** * 线程名称 */ private String threadName; /** * 类名 */ private String className; /** * 方法名 */ private String methodName; /** * 时间戳 */ private Long timestamp; /** * TraceId(链路追踪) */ private String traceId; /** * 自定义字段 */ private Map fields; } ``` ### 5.4 MQ配置 ```java /** * 日志MQ配置 */ @Configuration public class LogMQConfig { /** * 日志交换机 */ @Bean public TopicExchange logExchange() { return new TopicExchange("log.exchange", true, false); } /** * 日志队列 */ @Bean public Queue logQueue() { return QueueBuilder .durable("log.queue") // 绑定死信交换机 .withArgument("x-dead-letter-exchange", "log.dlx.exchange") .withArgument("x-dead-letter-routing-key", "log.dlx") // 设置消息TTL(24小时) .withArgument("x-message-ttl", 86400000) .build(); } /** * 死信交换机 */ @Bean public DirectExchange deadLetterExchange() { return new DirectExchange("log.dlx.exchange", true, false); } /** * 死信队列 */ @Bean public Queue deadLetterQueue() { return new Queue("log.dlx.queue", true); } /** * 绑定日志队列 */ @Bean public Binding logBinding() { return BindingBuilder .bind(logQueue()) .to(logExchange()) .with("log.#"); } /** * 绑定死信队列 */ @Bean public Binding deadLetterBinding() { return BindingBuilder .bind(deadLetterQueue()) .to(deadLetterExchange()) .with("log.dlx"); } } ``` ### 5.5 日志生产者 ```java /** * 日志生产者 */ @Component @Slf4j public class LogProducer { @Autowired private RabbitTemplate rabbitTemplate; /** * 发送日志消息 */ public void sendLog(LogMessage logMessage) { try { // 设置消息ID,用于追踪 Message message = MessageBuilder .withBody(JSON.toJSONString(logMessage).getBytes()) .setContentType(MessageProperties.CONTENT_TYPE_JSON) .setHeader("x-app-name", logMessage.getAppName()) .setHeader("x-log-level", logMessage.getLevel()) .build(); rabbitTemplate.send( "log.exchange", "log." + logMessage.getAppName(), message ); } catch (Exception e) { log.error("发送日志失败", e); } } /** * 发送错误日志 */ public void sendErrorLog(String appName, String message, Throwable throwable) { LogMessage logMessage = new LogMessage(); logMessage.setAppName(appName); logMessage.setLevel("ERROR"); logMessage.setMessage(message); logMessage.setStackTrace(getStackTrace(throwable)); logMessage.setThreadName(Thread.currentThread().getName()); logMessage.setTimestamp(System.currentTimeMillis()); sendLog(logMessage); } private String getStackTrace(Throwable throwable) { StringWriter sw = new StringWriter(); PrintWriter pw = new PrintWriter(sw); throwable.printStackTrace(pw); return sw.toString(); } } ``` ### 5.6 日志消费者 ```java /** * 日志消费者 */ @Component @Slf4j public class LogConsumer { @Autowired private ElasticsearchService elasticsearchService; @Autowired private LogRepository logRepository; /** * 消费日志消息 */ @RabbitListener(queues = "log.queue", containerFactory = "logListenerContainerFactory") public void handleLog(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) { try { String json = new String(message.getBody()); LogMessage logMessage = JSON.parseObject(json, LogMessage.class); log.info("收到日志消息: {} - {}", logMessage.getAppName(), logMessage.getMessage()); // 存储到数据库 LogEntity logEntity = convertToEntity(logMessage); logRepository.save(logEntity); // 存储到Elasticsearch elasticsearchService.indexLog(logMessage); // 手动确认 channel.basicAck(deliveryTag, false); } catch (Exception e) { log.error("处理日志消息失败", e); try { // 拒绝消息,不重新入队(避免重复处理) channel.basicNack(deliveryTag, false, false); } catch (IOException ex) { log.error("消息拒绝失败", ex); } } } private LogEntity convertToEntity(LogMessage logMessage) { LogEntity entity = new LogEntity(); entity.setAppName(logMessage.getAppName()); entity.setLevel(logMessage.getLevel()); entity.setMessage(logMessage.getMessage()); entity.setStackTrace(logMessage.getStackTrace()); entity.setThreadName(logMessage.getThreadName()); entity.setClassName(logMessage.getClassName()); entity.setMethodName(logMessage.getMethodName()); entity.setTimestamp(new Date(logMessage.getTimestamp())); return entity; } } ``` ### 5.7 消费者工厂配置 ```java /** * 日志消费者工厂配置 */ @Configuration public class LogListenerContainerFactory { @Bean public RabbitListenerContainerFactory logListenerContainerFactory( ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setMessageConverter(new Jackson2JsonMessageConverter()); // 设置手动确认 factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 设置并发消费者 factory.setConcurrentConsumers(5); factory.setMaxConcurrentConsumers(10); // 设置预取数量 factory.setPrefetch(1); // 设置重试 factory.setRetryStateBuilder( new RetryInterceptorBuilder.StatelessRetryInterceptorBuilder() .maxAttempts(3) .backOffOptions(1000, 2, 10000) .build() ); return factory; } } ``` ### 5.8 应用集成 ```java /** * 自定义日志Appender */ public class RabbitMQAppender extends UnsynchronizedAppenderBase { @Autowired private LogProducer logProducer; @Override protected void append(ILoggingEvent event) { try { LogMessage logMessage = new LogMessage(); logMessage.setAppName(getAppName()); logMessage.setLevel(event.getLevel().toString()); logMessage.setMessage(event.getFormattedMessage()); logMessage.setThreadName(event.getThreadName()); logMessage.setClassName(event.getLoggerName()); logMessage.setTimestamp(System.currentTimeMillis()); // 获取MDC中的traceId logMessage.setTraceId(MDC.get("traceId")); logProducer.sendLog(logMessage); } catch (Exception e) { // 日志发送失败不影响业务 System.err.println("发送日志到MQ失败: " + e.getMessage()); } } private String getAppName() { return "ecommerce-order"; } } ``` --- ## 6. 项目实战三:分布式事务处理 ### 6.1 项目背景 实现一个分布式事务场景:用户下单后,需要: 1. 创建订单(订单服务) 2. 扣减库存(库存服务) 3. 扣减余额(用户服务) 这三个操作必须保证一致性,要么全部成功,要么全部失败。 ### 6.2 最终一致性方案 使用消息队列实现最终一致性: ``` 订单服务 → MQ → 库存服务 ↓ 用户服务 ``` ### 6.3 事务消息发送 ```java /** * 事务消息生产者 */ @Component @Slf4j public class TransactionalMessageProducer { @Autowired private RabbitTemplate rabbitTemplate; /** * 发送事务消息 */ public void sendTransactionalMessage(Message message) { // 使用事务模板发送消息 rabbitTemplate.execute(channel -> { try { // 开启事务 channel.txSelect(); // 发送消息 rabbitTemplate.send(message); // 提交事务 channel.txCommit(); log.info("事务消息发送成功"); } catch (Exception e) { // 回滚事务 try { channel.txRollback(); } catch (IOException ex) { log.error("事务回滚失败", ex); } throw new RuntimeException("发送事务消息失败", e); } return null; }); } } ``` ### 6.4 本地消息表方案 ```java /** * 本地消息表 */ @Data @Entity @Table(name = "t_local_message") public class LocalMessage { @Id @GeneratedValue(strategy = GenerationType.IDENTITY) private Long id; /** * 业务ID */ private String businessId; /** * 消息内容 */ @Column(columnDefinition = "TEXT") private String message; /** * 交换机 */ private String exchange; /** * 路由键 */ private String routingKey; /** * 状态:0-待发送,1-已发送,2-发送失败 */ private Integer status; /** * 重试次数 */ private Integer retryCount; /** * 创建时间 */ private LocalDateTime createTime; /** * 更新时间 */ private LocalDateTime updateTime; } ``` ```java /** * 本地消息服务 */ @Service @Slf4j public class LocalMessageService { @Autowired private LocalMessageRepository localMessageRepository; @Autowired private RabbitTemplate rabbitTemplate; /** * 保存本地消息 */ @Transactional public void saveMessage(String businessId, Object message, String exchange, String routingKey) { LocalMessage localMessage = new LocalMessage(); localMessage.setBusinessId(businessId); localMessage.setMessage(JSON.toJSONString(message)); localMessage.setExchange(exchange); localMessage.setRoutingKey(routingKey); localMessage.setStatus(0); localMessage.setRetryCount(0); localMessage.setCreateTime(LocalDateTime.now()); localMessage.setUpdateTime(LocalDateTime.now()); localMessageRepository.save(localMessage); } /** * 发送消息 */ public void sendMessage(LocalMessage localMessage) { try { rabbitTemplate.convertAndSend( localMessage.getExchange(), localMessage.getRoutingKey(), JSON.parseObject(localMessage.getMessage()) ); // 更新状态为已发送 localMessage.setStatus(1); localMessage.setUpdateTime(LocalDateTime.now()); localMessageRepository.save(localMessage); log.info("消息发送成功: {}", localMessage.getBusinessId()); } catch (Exception e) { log.error("消息发送失败: {}", localMessage.getBusinessId(), e); // 更新状态为发送失败 localMessage.setStatus(2); localMessage.setRetryCount(localMessage.getRetryCount() + 1); localMessage.setUpdateTime(LocalDateTime.now()); localMessageRepository.save(localMessage); } } } ``` ```java /** * 定时任务:重试失败消息 */ @Component @Slf4j public class MessageRetryTask { @Autowired private LocalMessageRepository localMessageRepository; @Autowired private LocalMessageService localMessageService; /** * 每分钟重试一次失败消息 */ @Scheduled(fixedRate = 60000) public void retryFailedMessages() { // 查询失败且重试次数小于3的消息 List messages = localMessageRepository .findByStatusAndRetryCountLessThan(2, 3); for (LocalMessage message : messages) { try { localMessageService.sendMessage(message); } catch (Exception e) { log.error("重试消息失败: {}", message.getBusinessId(), e); } } } } ``` ### 6.5 事务消息示例 ```java /** * 订单服务(使用本地消息表) */ @Service @Slf4j public class OrderServiceWithLocalMessage { @Autowired private OrderRepository orderRepository; @Autowired private LocalMessageService localMessageService; /** * 创建订单 */ @Transactional public Order createOrder(OrderDTO orderDTO) { // 1. 创建订单 Order order = new Order(); order.setOrderNo(generateOrderNo()); order.setUserId(orderDTO.getUserId()); order.setProductId(orderDTO.getProductId()); order.setQuantity(orderDTO.getQuantity()); order.setAmount(orderDTO.getAmount()); order.setStatus(0); order.setCreateTime(LocalDateTime.now()); order = orderRepository.save(order); // 2. 保存本地消息(与订单在同一事务中) localMessageService.saveMessage( order.getOrderNo(), order, "order.exchange", "order.inventory.created" ); return order; } } ``` ```java /** * 消息发送服务 */ @Service @Slf4j public class MessageSenderService { @Autowired private LocalMessageRepository localMessageRepository; @Autowired private LocalMessageService localMessageService; /** * 发送待发送的消息 */ @Scheduled(fixedDelay = 1000) public void sendPendingMessages() { // 查询待发送的消息 List messages = localMessageRepository.findByStatus(0); for (LocalMessage message : messages) { try { localMessageService.sendMessage(message); } catch (Exception e) { log.error("发送消息失败: {}", message.getBusinessId(), e); } } } } ``` --- ## 7. 高阶应用:消息可靠性保证 ### 7.1 生产者可靠性 ```java /** * 生产者可靠性配置 */ @Configuration public class ProducerReliabilityConfig { @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); // 开启发送确认 rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { if (ack) { log.info("消息发送确认成功"); } else { log.error("消息发送确认失败: {}", cause); // 可以在这里进行重试 } }); // 开启返回确认 rabbitTemplate.setReturnsCallback(returned -> { log.error("消息无法路由: exchange={}, routingKey={}", returned.getExchange(), returned.getRoutingKey()); // 可以在这里进行重试 }); // 开启强制返回 rabbitTemplate.setMandatory(true); return rabbitTemplate; } } ``` ### 7.2 消费者可靠性 ```java /** * 消费者可靠性配置 */ @Configuration public class ConsumerReliabilityConfig { @Bean public RabbitListenerContainerFactory rabbitListenerContainerFactory( ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); // 手动确认 factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 重试配置 factory.setRetryStateBuilder( new RetryInterceptorBuilder.StatelessRetryInterceptorBuilder() .maxAttempts(3) .backOffOptions(1000, 2, 10000) .recoverer(new MessageRecoverer() { @Override public void recover(Message message, Throwable cause) { // 重试失败后的处理 log.error("消息处理失败,进入死信队列: {}", message); } }) .build() ); return factory; } } ``` ### 7.3 消息幂等性 ```java /** * 幂等性消费者 */ @Component @Slf4j public class IdempotentConsumer { @Autowired private RedisTemplate redisTemplate; /** * 处理消息(保证幂等性) */ @RabbitListener(queues = "order.queue") public void handleMessage(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) { try { String messageId = message.getMessageProperties().getMessageId(); // 检查是否已处理 if (isProcessed(messageId)) { log.info("消息已处理,跳过: {}", messageId); channel.basicAck(deliveryTag, false); return; } // 处理业务逻辑 processBusiness(message); // 标记为已处理 markAsProcessed(messageId); // 确认消息 channel.basicAck(deliveryTag, false); } catch (Exception e) { log.error("处理消息失败", e); try { channel.basicNack(deliveryTag, false, true); } catch (IOException ex) { log.error("消息拒绝失败", ex); } } } private boolean isProcessed(String messageId) { String key = "message:processed:" + messageId; return Boolean.TRUE.equals(redisTemplate.hasKey(key)); } private void markAsProcessed(String messageId) { String key = "message:processed:" + messageId; // 标记7天 redisTemplate.opsForValue().set(key, "1", 7, TimeUnit.DAYS); } private void processBusiness(Message message) { // 处理业务逻辑 } } ``` ### 7.4 消息顺序性 ```java /** * 顺序消息消费者 */ @Component @Slf4j public class OrderedConsumer { /** * 单一消费者保证顺序 */ @RabbitListener(queues = "order.queue", containerFactory = "singleListenerContainerFactory") public void handleMessage(Order order) { log.info("处理订单: {}", order.getOrderNo()); // 处理订单 } /** * 单一消费者工厂 */ @Bean public RabbitListenerContainerFactory singleListenerContainerFactory( ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setConcurrentConsumers(1); // 只有一个消费者 factory.setPrefetch(1); factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); return factory; } } ``` --- ## 8. 性能优化与监控 ### 8.1 性能优化配置 ```java /** * 性能优化配置 */ @Configuration public class PerformanceConfig { @Bean public RabbitListenerContainerFactory performanceContainerFactory( ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); // 增加并发消费者 factory.setConcurrentConsumers(10); factory.setMaxConcurrentConsumers(20); // 增加预取数量 factory.setPrefetch(10); // 批量确认 factory.setBatchListener(true); factory.setBatchSize(100); // 消费者线程池 factory.setTaskExecutor(new ThreadPoolTaskExecutor() {{ setCorePoolSize(20); setMaxPoolSize(50); setQueueCapacity(100); setThreadNamePrefix("rabbit-consumer-"); }}); return factory; } } ``` ### 8.2 监控指标 ```java /** * MQ监控服务 */ @Service @Slf4j public class MQMonitorService { @Autowired private RabbitAdmin rabbitAdmin; /** * 获取队列监控信息 */ public Map getQueueMetrics(String queueName) { Map metrics = new HashMap<>(); try { // 获取队列信息 QueueInfo queueInfo = rabbitAdmin.getQueueInfo(queueName); metrics.put("queueName", queueName.getName()); metrics.put("messageCount", queueInfo.getMessageCount()); metrics.put("consumerCount", queueInfo.getConsumerCount()); // 计算消息堆积率 long messageCount = queueInfo.getMessageCount(); long consumerCount = queueInfo.getConsumerCount(); double backlogRate = consumerCount > 0 ? (double) messageCount / consumerCount : messageCount; metrics.put("backlogRate", backlogRate); } catch (Exception e) { log.error("获取队列监控信息失败: {}", queueName, e); } return metrics; } /** * 获取所有队列监控信息 */ public List> getAllQueueMetrics() { List> metricsList = new ArrayList<>(); // 获取所有队列 List queues = rabbitAdmin.getQueues(); for (Queue queue : queues) { Map metrics = getQueueMetrics(queue.getName()); metricsList.add(metrics); } return metricsList; } } ``` ### 8.3 告警规则 ```java /** * MQ告警服务 */ @Service @Slf4j public class MQAlertService { @Autowired private MQMonitorService monitorService; /** * 检查队列堆积 */ @Scheduled(fixedRate = 60000) public void checkQueueBacklog() { List> metricsList = monitorService.getAllQueueMetrics(); for (Map metrics : metricsList) { long messageCount = (Long) metrics.get("messageCount"); // 消息堆积超过10000,发送告警 if (messageCount > 10000) { String queueName = (String) metrics.get("queueName"); log.warn("队列堆积告警: {} - 消息数: {}", queueName, messageCount); sendAlert(queueName, "队列堆积", messageCount); } } } /** * 检查消费者异常 */ @Scheduled(fixedRate = 60000) public void checkConsumerException() { // 检查消费者异常率 // 发送告警 } private void sendAlert(String queueName, String alertType, Object value) { // 发送告警(邮件、短信、钉钉等) log.info("发送告警: {} - {} - {}", queueName, alertType, value); } } ``` --- ## 9. 多MQ对比与选型 ### 9.1 主流MQ对比 ```java /** * 主流MQ对比 */ public class MQComparison { /** * MQ特性对比表 */ public static class ComparisonTable { public static void main(String[] args) { System.out.println("特性\t\tRabbitMQ\tKafka\tRocketMQ\tActiveMQ"); System.out.println("------------------------------------------------------------"); System.out.println("消息模型\tAMQP\t发布订阅\t发布订阅\tJMS"); System.out.println("吞吐量\t\t中等\t极高\t高\t中等"); System.out.println("可靠性\t\t高\t高\t高\t高"); System.out.println("实时性\t\t高\t中等\t高\t高"); System.out.println("消息堆积\t差\t优秀\t优秀\t差"); System.out.println("消息追踪\t支持\t支持\t支持\t支持"); System.out.println("事务消息\t支持\t支持\t支持\t支持"); System.out.println("顺序消息\t支持\t分区有序\t支持\t支持"); System.out.println("定时消息\t插件\t支持\t支持\t支持"); System.out.println("死信队列\t支持\t支持\t支持\t支持"); System.out.println("延迟队列\t插件\t支持\t支持\t支持"); System.out.println("社区活跃度\t高\t高\t高\t低"); System.out.println("学习难度\t中等\t中等\t高\t低"); } } } ``` ### 9.2 选型建议 ```java /** * MQ选型建议 */ public class MQSelection { /** * 选型决策树 */ public static class DecisionTree { public String selectMQ(Requirements requirements) { // 1. 大数据场景,需要高吞吐量 if (requirements.isBigData() && requirements.needsHighThroughput()) { return "Kafka"; } // 2. 电商、金融场景,需要高可靠性和事务支持 if (requirements.isFinancial() || requirements.isECommerce()) { if (requirements.isAliyun()) { return "RocketMQ"; } else { return "RabbitMQ"; // 或RocketMQ } } // 3. 传统Java应用,需要JMS规范 if (requirements.needJMS()) { return "ActiveMQ"; } // 4. 默认选择RabbitMQ return "RabbitMQ"; } } /** * 需求分析 */ public static class Requirements { private boolean isBigData; private boolean needsHighThroughput; private boolean isFinancial; private boolean isECommerce; private boolean isAliyun; private boolean needJMS; // getters and setters } } ``` --- ## 10. 总结与最佳实践 ### 10.1 核心要点总结 1. **MQ的核心价值** - 异步处理:提升系统响应速度 - 应用解耦:降低系统耦合度 - 流量削峰:保护后端系统 2. **消息可靠性** - 生产者:发送确认 + 重试 - MQ:持久化 + 集群 - 消费者:手动确认 + 重试 3. **最佳实践** - 消息ID:全局唯一,用于追踪 - 消息幂等:避免重复消费 - 死信队列:处理失败消息 - 监控告警:及时发现问题 ### 10.2 常见问题 ```java /** * 常见问题解决方案 */ public class CommonIssues { /** * 问题1:消息丢失 * 解决方案: * - 生产者:开启发送确认 * - MQ:消息持久化 * - 消费者:手动确认 */ public void solveMessageLoss() { // 配置发送确认 // 配置消息持久化 // 配置手动确认 } /** * 问题2:消息重复消费 * 解决方案: * - 消费者实现幂等性 * - 使用Redis记录已处理消息 */ public void solveDuplicateConsumption() { // 实现幂等性 } /** * 问题3:消息顺序性 * 解决方案: * - 单一消费者 * - 全局有序:单一分区 * - 局部有序:相同key到同一分区 */ public void solveMessageOrder() { // 单一消费者 } /** * 问题4:消息堆积 * 解决方案: * - 增加消费者 * - 增加消费者并发度 * - 优化消费者处理逻辑 */ public void solveMessageBacklog() { // 增加消费者 } } ``` ### 10.3 完整项目示例 ```java /** * 完整的Spring Boot + RabbitMQ项目配置 */ @SpringBootApplication @EnableRabbit public class MQApplication { public static void main(String[] args) { SpringApplication.run(MQApplication.class, args); } /** * RabbitMQ配置 */ @Configuration public static class RabbitConfig { @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost"); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin123"); connectionFactory.setVirtualHost("/"); connectionFactory.setPublisherConfirmType( CachingConnectionFactory.ConfirmType.CORRELATED); connectionFactory.setPublisherReturns(true); return connectionFactory; } @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate template = new RabbitTemplate(connectionFactory); template.setMessageConverter(new Jackson2JsonMessageConverter()); template.setMandatory(true); return template; } @Bean public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) { return new RabbitAdmin(connectionFactory); } } /** * 监控配置 */ @Configuration @EnableScheduling public static class MonitorConfig { @Bean public MQMonitorService mqMonitorService(RabbitAdmin rabbitAdmin) { return new MQMonitorService(rabbitAdmin); } @Bean public MQAlertService mqAlertService(MQMonitorService mqMonitorService) { return new MQAlertService(mqMonitorService); } } } ``` --- ## 结语 通过本文的学习,你应该已经掌握了: 1. **MQ基础概念**:理解了MQ的核心价值和工作原理 2. **实战项目**:完成了3个完整的项目实战 3. **高级特性**:掌握了消息可靠性、幂等性、顺序性等 4. **性能优化**:了解了性能优化和监控方法 5. **MQ选型**:能够根据业务需求选择合适的MQ ### 下一步学习建议 1. **深入学习RabbitMQ**: - 学习RabbitMQ集群搭建 - 学习RabbitMQ高级特性(插件) 2. **学习其他MQ**: - Kafka:大数据场景 - RocketMQ:阿里开源,功能强大 - ActiveMQ:传统JMS 3. **云原生MQ**: - 云消息队列(阿里云、腾讯云) - Kafka on K8s 4. **实践项目**: - 分布式事务 - 事件驱动架构 - CQRS模式 消息队列是分布式系统中不可或缺的组件,掌握它的使用将大大提升你的架构能力。继续深入学习,你会发现更多精彩!
评论 0

发表评论 取消回复

Shift+Enter 换行  ·  Enter 发送
还没有评论,来发表第一条吧