RabbitMQ的大致复习

# RabbitMQ ## 环境搭建 ```shell docker run -d --name rabbitmq \ -p 5671:5671 -p 5672:5672 \ -p 4639:4639 -p 25672:25672 \ --restart=always \ -p 15671:15671 -p 15672:15672 rabbitmq:management ``` ### 测试环境是否可用 `http://虚拟机ip:15672` ![image.png](https://cos.easydoc.net/13568421/files/lm3ekg0q.png) ![image.png](https://cos.easydoc.net/13568421/files/lm3em83h.png) ## 整合RabbitMQ 1. POM ```xml <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> ``` 2. YAML ```yaml rabbitmq: host: 192.168.10.131 port: 5672 username: guest password: guest ``` 3. 主启动 ```java @Configuration @EnableRabbit public class RabbitMQConfig { } ``` ### 单元测试 #### 大坑 > 这个坑出现了很多次了, 再次说明, 如果测试目录和源码目录不一致, 会导致组件无法注入的情况, 也会导致忽略文件不能正常忽略的错误 ```java @SpringBootTest @RunWith(SpringRunner.class) public class BitmallOrderApplicationTests { @Autowired private AmqpAdmin amqpAdmin; @Autowired private RabbitTemplate rabbitTemplate; @Test public void AmqpAdminTest1() { amqpAdmin.declareExchange(new DirectExchange("hello_exchange", true, false)); } @Test public void AmqpAdminTest2() { // 排他队列: 排他队列相当于加了个重量级锁, 一个队列不能被同时监听, 只能被一个客户端监听 amqpAdmin.declareQueue(new Queue("hello_queue", true, false, false)); } @Test public void AmqpAdminTest3() { amqpAdmin.declareBinding(new Binding("hello_queue", Binding.DestinationType.QUEUE, "hello_exchange" , "hello_routing_key", null)); } @Test public void sendMsg1() { // 我们需要调用如下的方法, 先将目标Java对象转换, 然后再发送, 不调用则需要自己封装Message对象 // 1. 交换机 2. 路由键 3. 消息实体 4. 唯一标识 rabbitTemplate .convertAndSend("hello_exchange", "hello_routing_key", "我是一条消息", new CorrelationData(UUID.randomUUID().toString())); } @Test public void sendMsg2() { rabbitTemplate.convertAndSend("hello_exchange", "hello_routing_key", new Mem(), new CorrelationData(UUID.randomUUID().toString())); } static class Mem implements Serializable { private Integer num = 1; } } ``` > **上述定义: 排他队列的解释 API的解释** ### 发送消息的问题 ![image.png](https://cos.easydoc.net/13568421/files/lm3gkj8d.png) > 由上图可知, Java存储的本质就是序列化, 所以, 想要存储在消息队列的对象所在的类都需要实现序列化接口, 否则会抛异常, 但是, 这种情况会影响程序扩展性, 不采用, 因此, 我们需要用JSON替代序列化 ```java @Configuration @EnableRabbit public class RabbitMQConfig { @Bean public MessageConverter messageConverter() { return new Jackson2JsonMessageConverter(); } } ``` ### 其他测试以及结论 ```java @Component @Slf4j @RabbitListener(queues = "hello_queue") public class HelloServiceImpl { @RabbitHandler public void listen4() { // 有了之前的操作, 为什么需要由@RabbitHandler呢 // 因此一个消息队列里面由各种类型的消息组成, 因此, 为了让不同的消息有不同的处理方式, 因此需要该注解 // 参数写不同点类型代表处理这种类型的消息 } /*@RabbitListener(queues = "hello_queue") public void listen1(Object object) { // 2023-09-03 21:29:25.068 INFO 67880 --- [ntContainer#0-1] c.j.b.o.service.impl.HelloServiceImpl : // 消息体(Body:'"消息9"' MessageProperties [headers={__TypeId__=java.lang.String}, contentType=application/json, // contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, // receivedExchange=hello_exchange, receivedRoutingKey=hello_routing_key, deliveryTag=5, // consumerTag=amq.ctag-Sx6_ThVA8lCPbrChKxt_iQ, consumerQueue=hello_queue]) // 消息类型class org.springframework.amqp.core.Message // ------------------------------------------------------------- // 说明了接受的消息是Message类型 // 由消息体本身(数据本身), 消息的属性信息组成 log.info("消息体{}, 消息类型{}", object, object.getClass()); }*/ /* @RabbitListener(queues = "hello_queue") public void listen2(String res, Channel channel) { // 说明消息体可用自动封装 // 信道所在的包是 com.rabbitmq.client.Channel, 不要导错了 log.info("消息本体的封装对象{}, 信道{}", res, channel); }*/ /* @RabbitListener(queues = "hello_queue") public void listen3(String res) { try {Thread.sleep(4000);} catch (InterruptedException e) {e.printStackTrace();} log.info("获取到消息:{}", res); // 消息不可以被重复消费, 消息只能被消费一次 // 在并发的场景下, 默认采用轮询的方式分配消息 // 如果消费者没有处理完消息, 不能处理下一个消息, 必须等待当前消息处理结束 }*/ } ``` ![image.png](https://cos.easydoc.net/13568421/files/lm3ifmoi.png) ![image.png](https://cos.easydoc.net/13568421/files/lm3ifzlx.png) > 上述定义: 为什么是Message类型, 参数可用什么(3个), 消费者的特性 ## 复习 ### 消息丢失的三大场景 > 消息丢失不仅仅有下面的场景, 还有更多的场景, 例如到交换机就丢失了, 在丢列丢失了(TTL, 溢出, NACK等), 持久化失败等场景, 我们主要讨论过程中的消息丢失 > 前提: 消息的生产到最终的消费会经历三大过程 > 第一个过程是从P->B, 即生产者到消息中间件实体 > 第二个过程是从E->Q, 即交换机到队列 > 第三个过程是从Q->C, 即队列到消费者 ### 解决消息丢失 #### 传统方案 > 如果想要解决消息丢失问题, 最传统的方案就是开启事务消息, 即在发送消息的信道上开启事务, 这确实可以解决消息丢失问题 > 但是, 如果开启了事务消息, 会导致开小变大, 吞吐量和延迟等参数变差, 因此, 不建议使用事务消息 #### 确认机制方案 ![image.png](https://cos.easydoc.net/13568421/files/lm416o4s.png) > 概括而言, 确认机制分为两段, 分别这发送消息的确认机制, 还有接受消息的应答机制 1. ComfirmCallBack > ConfirmCallBack: 被称为确认回调, 主要作用于P -> B这个过程, 即, 在单体应用的前提下, 如果RabbitMQ实体能正常获取消息, 那么就会触发确认回调, 在集群的前提下, 如果所有的节点都能正常获取消息, 才会触发确认回调 > 如果确认回调被执行, 仅仅能说明消息成功到达RabbitMQ, 不能保证其一定能推送到队列, 或者被消费, 只能保证其所在的流程是没问题的 ```yaml spring: rabbitmq: publisher-confirms: true ``` ```java @Autowired private RabbitTemplate rabbitTemplate; @Bean public RabbitTemplate.ConfirmCallback confirmCallback() { return (correlationData, ack, cause) -> { log.info("唯一标识{}, 是否可以正确收到{}, 失败的原因{}", correlationData, ack, cause); }; } @PostConstruct // 这个注解下的方法不能有任何的参数 public void initRabbitTemplate() { rabbitTemplate.setConfirmCallback(confirmCallback()); // 仅仅会自动注入组件, 并不会再次执行 } ``` 2. ReturnCallBack > RenturnCallBack: 被称为退回回调, 主要作用域E -> Q这个过程, 即, 在单体应用的前提下, 如果交换机的数据不能正常推送到队列, 那么就会触发退回回调, 在集群的前提下, 如果某一个交换机的数据不能推送到队列, 那么也会触发退回回调 > 如果没有执行退回回调, 只能说明消息能正常抵达队列, 并不能说明其一定可以被消费成功, 只能保证其所在流程是没有任何问题的 ```yaml spring: rabbitmq: publisher-returns: true template: mandatory: true # 异步执行退回回调 ``` ```java @Bean public RabbitTemplate.ReturnCallback returnCallback() { return (message, replyCode, replyText, exchange, routingKey) -> { log.info("该消息无法正常抵达队列"); log.info("退回的消息:{}, 退回状态码{}, 退回原因{}, 交换机{}, 路由键{}", message, replyCode, replyText, exchange, routingKey); }; } @PostConstruct // 这个注解下的方法不能有任何的参数 public void initRabbitTemplate() { rabbitTemplate.setConfirmCallback(confirmCallback()); // 仅仅会自动注入组件, 并不会再次执行 rabbitTemplate.setReturnCallback(returnCallback()); } ``` ```java 2023-09-04 07:18:21.734 INFO 48404 --- [168.10.131:5672] c.j.bitmall.order.config.RabbitMQConfig : 该消息无法正常抵达队列 2023-09-04 07:18:21.734 INFO 48404 --- [168.10.131:5672] c.j.bitmall.order.config.RabbitMQConfig : 退回的消息:(Body:'"消息6"' MessageProperties [headers={__TypeId__=java.lang.String}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]), 退回状态码312, 退回原因NO_ROUTE, 交换机hello_exchange, 路由键hello_routing_key1 2023-09-04 07:18:21.734 INFO 48404 --- [168.10.131:5672] c.j.bitmall.order.config.RabbitMQConfig : 唯一标识null, 是否可以正确收到true, 失败的原因null ``` #### 应答机制 > 在默认情况下, 如果消费者成功接收到消息, 会自动ACK, 即自动应答, 一旦RabbitMQ的消息队列接收到确认应答, 那么就会把对应的消息删除, 如果一直获取不到应答消息, 会一直存储 > 但是, 除非在极高吞吐量要求的前提下, 不建议使用自动应答, 如果使用自动应答, 那么意味着消费者端接收到消息就应答了, 消息队列就删除了对应的消息, 如果此时宕机, 消息没有办法被正常处理, 消息就彻底丢失了 > 因此, 我们需要采用手动确认应答, 但是, 在手动确认应答的前提下, 我们不能使用批量应答, 如果我们使用批量确认应答或者是否认应答, 会见信道内所有没有应答的消息都应答了, 这部分消息会有丢失的可能 ##### 关闭微服务, 为什么RabbitMQ中消息队列里面的消息没了? > 当我们关闭微服务的时候, 监听消息队列的方法会将所有的消息给处理完, 才能正常的关闭微服务, 除非我们直接将这个进程Kill掉, 才不会继续执行 #### 是否重新入队的理解 > 如果能重新入对, 那么保证了消息会被重新给新的消费者消费, 保证了消息一定能处理成功 > 如果不重新入对, 那么意味着消息会被丢弃, 消息不一定能处理成功, 不重新入对的情况下, 一般采用死信交换机解决消息丢失问题 ##### 不ACK, NACK, REJECT的关系 > 如果不ACK, 那么就是不确认应答, 消息会一直存储在消息队列里面 > 如果NACK, 那么得分情况, 如果需要重新入对, 结果上和不ACK是同样的效果 > NACK和REJECT最大的区别是是否支持批量操作 ##### deliverTag的说明 > DeliverTag本质上也是消息唯一ID, 但是他是在信道内单调底层的, 从1开始 ##### 拒绝应答后, 为什么消息清空了? > 因为拒绝应答后, 消息重新入队了, 被其他消费者成功处理了, 因此清空了 ```yaml spring: rabbitmq: listener: simple: acknowledge-mode: manual ```