苏州注册公司需要多少钱网站建设关健词优化网络公司怎么样
2026/1/12 15:29:03 网站建设 项目流程
苏州注册公司需要多少钱,网站建设关健词优化网络公司怎么样,百度seo工作室,网站开发仓库管理系统需求分析前言今天我们来聊聊一个让很多开发者头疼的话题——MQ消息丢失问题。有些小伙伴在工作中#xff0c;一提到消息队列就觉得很简单#xff0c;但真正遇到线上消息丢失时#xff0c;排查起来却让人抓狂。其实#xff0c;我在实际工作中#xff0c;也遇到过MQ消息丢失的情况。…前言今天我们来聊聊一个让很多开发者头疼的话题——MQ消息丢失问题。有些小伙伴在工作中一提到消息队列就觉得很简单但真正遇到线上消息丢失时排查起来却让人抓狂。其实我在实际工作中也遇到过MQ消息丢失的情况。今天这篇文章专门跟大家一起聊聊这个话题希望对你会有所帮助。一、消息丢失的三大环节在深入解决方案之前我们先搞清楚消息在哪几个环节可能丢失1. 生产者发送阶段网络抖动导致发送失败生产者宕机未发送Broker处理失败未返回确认2. Broker存储阶段内存消息未持久化重启丢失磁盘故障导致数据丢失集群切换时消息丢失3. 消费者处理阶段自动确认模式下处理异常消费者宕机处理中断手动确认但忘记确认理解了问题根源接下来我们看5种实用的解决方案。二、方案一生产者确认机制核心原理生产者发送消息后等待Broker确认确保消息成功到达。这是防止消息丢失的第一道防线。关键实现// RabbitMQ生产者确认配置Beanpublic RabbitTemplate rabbitTemplate() {RabbitTemplate template new RabbitTemplate(connectionFactory);template.setConfirmCallback((correlationData, ack, cause) - {if (ack) {// 消息成功到达BrokermessageStatusService.markConfirmed(correlationData.getId());} else {// 发送失败触发重试retryService.scheduleRetry(correlationData.getId());}});return template;}// 可靠发送方法public void sendReliable(String exchange, String routingKey, Object message) {String messageId generateId();// 先落库保存发送状态messageStatusService.saveSendingStatus(messageId, message);// 发送持久化消息rabbitTemplate.convertAndSend(exchange, routingKey, message, msg - {msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);msg.getMessageProperties().setMessageId(messageId);return msg;}, new CorrelationData(messageId));}适用场景对消息可靠性要求高的业务金融交易、订单处理等关键业务需要精确知道消息发送结果的场景三、方案二消息持久化机制核心原理将消息保存到磁盘确保Broker重启后消息不丢失。这是防止Broker端消息丢失的关键。关键实现// 持久化队列配置Beanpublic Queue orderQueue() {return QueueBuilder.durable(order.queue) // 队列持久化.deadLetterExchange(order.dlx) // 死信交换机.build();}// 发送持久化消息public void sendPersistentMessage(Object message) {rabbitTemplate.convertAndSend(order.exchange, order.create, message, msg - {msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); // 消息持久化return msg;});}// Kafka持久化配置Beanpublic ProducerFactoryString, Object producerFactory() {MapString, Object props new HashMap();props.put(ProducerConfig.ACKS_CONFIG, all); // 所有副本确认props.put(ProducerConfig.RETRIES_CONFIG, 3); // 重试次数props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // 幂等性return new DefaultKafkaProducerFactory(props);}优缺点优点有效防止Broker重启导致的消息丢失配置简单效果明显缺点磁盘IO影响性能需要足够的磁盘空间四、方案三消费者确认机制核心原理消费者处理完消息后手动向Broker发送确认Broker收到确认后才删除消息。这是保证消息不丢失的最后一道防线。关键实现// 手动确认消费者RabbitListener(queues order.queue)public void handleMessage(Order order, Message message, Channel channel) {long deliveryTag message.getMessageProperties().getDeliveryTag();try {// 业务处理orderService.processOrder(order);// 手动确认channel.basicAck(deliveryTag, false);log.info(消息处理完成: {}, order.getOrderId());} catch (Exception e) {log.error(消息处理失败: {}, order.getOrderId(), e);// 处理失败重新入队channel.basicNack(deliveryTag, false, true);}}// 消费者容器配置Beanpublic SimpleRabbitListenerContainerFactory containerFactory() {SimpleRabbitListenerContainerFactory factory new SimpleRabbitListenerContainerFactory();factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 手动确认factory.setPrefetchCount(10); // 预取数量factory.setConcurrentConsumers(3); // 并发消费者return factory;}注意事项确保业务处理完成后再确认合理设置预取数量避免内存溢出处理异常时要正确使用NACK五、方案四事务消息机制核心原理通过事务保证本地业务操作和消息发送的原子性要么都成功要么都失败。关键实现// 本地事务表方案Transactionalpublic void createOrder(Order order) {// 1. 保存订单到数据库orderRepository.save(order);// 2. 保存消息到本地消息表LocalMessage localMessage new LocalMessage();localMessage.setBusinessId(order.getOrderId());localMessage.setContent(JSON.toJSONString(order));localMessage.setStatus(MessageStatus.PENDING);localMessageRepository.save(localMessage);// 3. 事务提交本地业务和消息存储保持一致性}// 定时任务扫描并发送消息Scheduled(fixedDelay 5000)public void sendPendingMessages() {ListLocalMessage pendingMessages localMessageRepository.findByStatus(MessageStatus.PENDING);for (LocalMessage message : pendingMessages) {try {// 发送消息到MQrabbitTemplate.convertAndSend(order.exchange, order.create, message.getContent());// 更新消息状态为已发送message.setStatus(MessageStatus.SENT);localMessageRepository.save(message);} catch (Exception e) {log.error(发送消息失败: {}, message.getId(), e);}}}// RocketMQ事务消息public void sendTransactionMessage(Order order) {TransactionMQProducer producer new TransactionMQProducer(order_producer);// 发送事务消息Message msg new Message(order_topic, create,JSON.toJSONBytes(order));TransactionSendResult result producer.sendMessageInTransaction(msg, null);if (result.getLocalTransactionState() LocalTransactionState.COMMIT_MESSAGE) {log.info(事务消息提交成功);}}适用场景需要严格保证业务和消息一致性的场景分布式事务场景金融、电商等对数据一致性要求高的业务六、方案五消息重试与死信队列核心原理通过重试机制处理临时故障通过死信队列处理最终无法消费的消息。关键实现// 重试队列配置Beanpublic Queue orderQueue() {return QueueBuilder.durable(order.queue).withArgument(x-dead-letter-exchange, order.dlx) // 死信交换机.withArgument(x-dead-letter-routing-key, order.dead).withArgument(x-message-ttl, 60000) // 60秒后进入死信.build();}// 死信队列配置Beanpublic Queue orderDeadLetterQueue() {return QueueBuilder.durable(order.dead.queue).build();}// 消费者重试逻辑RabbitListener(queues order.queue)public void handleMessageWithRetry(Order order, Message message, Channel channel) {long deliveryTag message.getMessageProperties().getDeliveryTag();try {orderService.processOrder(order);channel.basicAck(deliveryTag, false);} catch (TemporaryException e) {// 临时异常重新入队重试channel.basicNack(deliveryTag, false, true);

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询