document
API test

日志架构

POST

Description or Example

# 日志架构 ## 为什么需要日志? > 因为消息在传输过程中可能会丢失, 因此需要记录日志 ## 知识点 ### 将mq抽取一个微服务的若干原因 1. **抽取了mq微服务, 发送消息成功与其他微服务解耦, 如果对发送消息进行进一步封装将会非常的简单** ## 为什么使用MongoDB 1. 日志绝大部分是一些无用数据,且数据与数据之间并没有强烈的关系,因此,不需要MySQL这样的的关系型数据库 2. MongoDB以文档的形式存储数据,对于日志可读需求强的数据非常适合 3. 不适合用MySQL作为日志存储,因为日志的数据量大,会导致MySQL对日日志的查询效率变得极低,导致数据库整体压力大,系统性能下滑 4. 不适合用Redis,因为这样日志会耦合其他功能, Redis性能会被拖累 5. MongoDB是文档类型数据库,轻量级,适合日志记录 ## Docker安装MongoDB ```shell docker run --name mongodb -d -p 27017:27017 \ --privileged=true \ -v /bitmail/mongodb/data:/data/db \ --restart=always \ mongo ``` ### 创建用户名和密码 ![image.png](https://cos.easydoc.net/13568421/files/lmkrnezv.png) ### 注意 > **直接使用上面的代码即可, 官方文档的docker创建会存在两个问题, 一个是环境变量的过期, 一个是容器名称的错误** [参考地址](https://www.mongodb.com/zh-cn/compatibility/docker) ## MongoDB整合Springboot ### 1. 引入依赖 ```xml <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-mongodb</artifactId> </dependency> ``` ### 2. 配置文件 ```java data: mongodb: host: 192.168.10.131 port: 27017 username: root password: root ``` ### 3. 创建数据库 > 直接在navicat创建数据库即可 ### 4. CURD测试 [原生API参考文档](https://mongodb.net.cn/manual/crud/) [整合Springboot的blog](https://blog.csdn.net/qq_46112274/article/details/117425532) [springbootAPI的参考blog](https://juejin.cn/post/7222676391464828986) # 整合业务 ## 声明文档和集合 ```java @Document(collection = "bitmall_mq_message") // 声明集合的名字, 如果集合不存在会自动创建 @Data @Accessors(chain = true) public class MessageLog { /** * 消息的ID */ @Id private String messageId; /** * 消息的内容 */ private String content; /** * 消息发送的目标交换机 */ @Field("to_exchange") // 指定映射关系 private String toExchange; /** * 目标的路由键 */ @Field("routing_key") private String routingKey; /** * 消息状态 */ @Field("message_status") private Integer messageStatus; /** * 创建时间 */ @Field("create_time") private DateTime createTime; /** * 更新时间 */ @Field("update_time") private DateTime updateTime; /** * 消息的类型, 否则发送消息的时候不知道发什么类型的消息 */ @Field("clazz") private String clazz; } ``` ![image.png](https://cos.easydoc.net/13568421/files/lmk1jvzu.png) ## 自定义消息的状态 ```java public enum MsgStatus { MESSAGE_NEW(0, "已发送状态"), MESSAGE_SEND(1, "已抵达MQ"), MESSAGE_UN_CATCH(2, "未抵达队列"), MESSAGE_UN_ACK(3, "成功处理消息"); private Integer code; private String msg; MsgStatus(Integer code, String msg) { this.code = code; this.msg = msg; } public Integer getCode() { return code; } public String getMsg() { return msg; } } ``` # BUG修复 ## 消息队列里面的消息不能被监听器正确处理 > 在没有引入日志系统之前, 是一切正常的, 但是, 一旦引入了日志系统, 就会出现如下情况 ```java org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener method 'no match' threw exception ``` ### 根本原因 > **因为我们在发送消息的时候会封装一个`Message`对象, 一般我们去`new`这个对象都需要传入一个`byte[]`字节数组, 之前时使用了`JSON.toJSONByte()`方法, 因此, content-type全都是`application/json`, 但是, 这种方法是不可取的, 因为这样会导致消息不能被区分开** > **因此, 我们看向源码** ![image.png](https://cos.easydoc.net/13568421/files/lmkp8bhh.png) ![image.png](https://cos.easydoc.net/13568421/files/lmkpawnf.png) ## 解决数据库时区问题 ### MySQL > 修改`my.cnf`文件 ```text [mysqld] default-time-zone=+08:00 ``` ### mongoDB > **mongoDB不需要解决时区问题, 因为mongoDB读取到Java对象的时候, 已经是东八区的时间了, 在navicat显示的时间是标准时间** > **即时间读取过来会自动变成东八区时间, 不需要解决时区问题** ### 消息重试带来的问题 #### 是否有必要消息重新入队 > **从消息丢失上的角度来说, 消息不重新入队是完全没有问题的, 因为日志数据库已经记录了该消息, 但是, 如果不重新入队, 那么这个消息被处理的就会面临长时间等待才能被处理, 用户体验极差, 因此, 还是需要重新入队** #### 怎么解决这个问题 > **可以通过配置文件解决这个问题** ```yaml spring: rabbitmq: listener: retry: enabled: true # 开启重试 max-attempts: 3# 最大重试次数 ``` > **如果不开启重试, 监听器会陷入一个奇怪的死循环中, 监听器处理失败, 进入队列, 监听器处理失败....** > <font color='red'>**如果设置了该参数, 如果当前监听器监听失败, 当前监听器不再重试, 交予其他监听器重试, 一共重试3次**</font> # 日志更新与创建代码 ```java @Bean public RabbitTemplate.ConfirmCallback confirmCallback() { return (correlationData, ack, cause) -> { // 唯一标识和消息的id一直 if (correlationData != null) { String messageId = correlationData.getId(); sendMsgUtil.callbackUpdate(messageId, MQConstant.MsgStatus.MESSAGE_ATTACH_MQ.getCode()); } }; } @Bean public RabbitTemplate.ReturnCallback returnCallback() { return (message, replyCode, replyText, exchange, routingKey) -> { // 获取消息的ID String messageId = message.getMessageProperties().getMessageId(); sendMsgUtil.callbackUpdate(messageId, MQConstant.MsgStatus.MESSAGE_UN_CATCH_QUEUE.getCode()); }; } ``` ```java @Component public class SendMsgUtil { @Autowired private RabbitTemplate rabbitTemplate; @Autowired private MongoTemplate mongoTemplate; public void convertAndSendAndSaveLog(String toExchange, String toRoutingKey, Object messageBody) { // 封装MessageProperties, 主要是想要获取消息的id MessageProperties messageProperties = new MessageProperties(); messageProperties.setMessageId(UUID.randomUUID().toString().replace("-", "")); messageProperties.setReceivedExchange(toExchange); messageProperties.setReceivedRoutingKey(toRoutingKey); // 这里构建消息需要充分区分消息 Message message = rabbitTemplate.getMessageConverter().toMessage(messageBody, messageProperties); // 继续发送消息, 取消锁定 rabbitTemplate.convertAndSend(toExchange, toRoutingKey, message, processor -> { // 发消息前记录日志 MessageLog messageLog = new MessageLog(); MessageProperties properties = processor.getMessageProperties(); messageLog.setMessageId(properties.getMessageId()) .setToExchange(properties.getReceivedExchange()) .setRoutingKey(properties.getReceivedRoutingKey()) .setContent(JSON.toJSONString(messageBody)) .setMessageStatus(MQConstant.MsgStatus.MESSAGE_SEND.getCode()) .setClazz(messageBody.getClass().getName()) .setCreateTime(new DateTime()); mongoTemplate.insert(messageLog); return message; }, new CorrelationData(messageProperties.getMessageId())); } public void callbackUpdate(String messageId, Integer status) { MessageLog messageLog = mongoTemplate.findById(messageId, MessageLog.class); if (messageLog != null) { messageLog.setMessageStatus(status) .setUpdateTime(new DateTime()); mongoTemplate.save(messageLog); } } public void finish(String messageId) { MessageLog messageLog = mongoTemplate.findById(messageId, MessageLog.class); if (messageLog != null) { messageLog.setMessageStatus(MQConstant.MsgStatus.MESSAGE_ACK.getCode()) .setUpdateTime(new DateTime()); mongoTemplate.save(messageLog); } } } ``` ```java @Configuration @EnableScheduling @EnableAsync public class ScheduleConfig { } ``` ```java @Component public class RetrySendSchedule { @Autowired private MongoTemplate mongoTemplate; @Autowired private RabbitTemplate rabbitTemplate; @Scheduled(cron = "0 0 0 * * ?") @Async public void retry() throws ClassNotFoundException { List<MessageLog> messageLogs = mongoTemplate.find(new Query() .addCriteria(Criteria .where("message_status") .in(MQConstant.MsgStatus.MESSAGE_SEND.getCode(), MQConstant.MsgStatus.MESSAGE_UN_CATCH_QUEUE.getCode())), MessageLog.class ); if (!messageLogs.isEmpty()) { for (MessageLog messageLog : messageLogs) { MessageProperties messageProperties = new MessageProperties(); messageProperties.setMessageId(messageLog.getMessageId()); messageProperties.setReceivedExchange(messageLog.getToExchange()); messageProperties.setReceivedRoutingKey(messageLog.getRoutingKey()); String content = messageLog.getContent(); Message message = rabbitTemplate.getMessageConverter().toMessage(JSON.parseObject(content, Class.forName(messageLog.getClazz())), messageProperties); rabbitTemplate.convertAndSend(messageLog.getToExchange(), messageLog.getRoutingKey(), message, new CorrelationData(messageLog.getMessageId())); // 重新发送消息不需要记录日志 } } } } ``` ## 为什么重新发送消息不需要记录日志 > **因为现在是重新发送, 原来就有对应的日志了, 如果还创建一个日志, 假设当前不成功, 那么需要重新发送的消息就会指数增加, 但是绝大部分都是没用的, 因此不需要记录日志**