深圳网站营销公司域名策划方案
2026/3/28 14:35:48 网站建设 项目流程
深圳网站营销公司,域名策划方案,石家庄企业网站开发,免费网站入口网站免费进一、事务消息核心原理 1.1 事务消息解决的问题 在分布式系统中#xff0c;保证本地事务与消息发送的原子性。 1.2 二阶段提交流程 java 复制 下载 // 事务消息的完整流程 ┌─────────────────┐ 1.发送半消息 ┌────────────────…一、事务消息核心原理1.1 事务消息解决的问题在分布式系统中保证本地事务与消息发送的原子性。1.2 二阶段提交流程java复制下载// 事务消息的完整流程 ┌─────────────────┐ 1.发送半消息 ┌─────────────────┐ │ ├───────────────────►│ │ │ 生产者 │ │ RocketMQ │ │ (Producer) │◄───────────────────┤ Broker │ │ │ 2.半消息发送成功 │ │ └─────────┬───────┘ └─────────┬───────┘ │ │ │ 3.执行本地事务 │ ▼ │ ┌─────────────────┐ │ │ 本地事务执行结果 │ │ │ (Commit/Rollback)│ │ └─────────┬───────┘ │ │ │ │ 4.提交或回滚事务状态 │ └──────────────────────────────────────►┌─────────────────┐ │ 事务状态回查 │ │ (如果超时未确认) │ └─────────────────┘二、核心代码实现2.1 生产者端实现java复制下载// 事务消息生产者 public class TransactionProducer { private final TransactionMQProducer producer; private final TransactionListener transactionListener; public TransactionProducer() { // 1. 创建事务消息生产者 producer new TransactionMQProducer(TransactionProducerGroup); producer.setNamesrvAddr(localhost:9876); // 2. 设置事务监听器核心组件 transactionListener new LocalTransactionListenerImpl(); producer.setTransactionListener(transactionListener); // 3. 设置事务回查线程池 ExecutorService executorService new ThreadPoolExecutor( 2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue(2000), r - new Thread(r, Transaction-Check-Thread) ); producer.setExecutorService(executorService); // 4. 启动生产者 producer.start(); } // 发送事务消息 public SendResult sendTransactionMessage(String topic, String tags, Object businessData) throws Exception { // 1. 构建消息 Message msg new Message(topic, tags, JSON.toJSONBytes(businessData)); // 2. 设置事务ID用于关联本地事务 String transactionId UUID.randomUUID().toString(); msg.putUserProperty(TRANSACTION_ID, transactionId); // 3. 发送半消息第一阶段 SendResult sendResult producer.sendMessageInTransaction(msg, null); System.out.println(半消息发送结果: sendResult.getSendStatus()); return sendResult; } } // 事务监听器实现核心 public class LocalTransactionListenerImpl implements TransactionListener { // 本地事务执行状态存储 private final MapString, LocalTransactionState transactionStateMap new ConcurrentHashMap(); private final TransactionService transactionService; public LocalTransactionListenerImpl() { this.transactionService new TransactionService(); } /** * 第一阶段执行本地事务 * 当半消息发送成功后Broker会回调此方法 */ Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { String transactionId msg.getUserProperty(TRANSACTION_ID); System.out.println(执行本地事务事务ID: transactionId); try { // 1. 执行本地业务逻辑如数据库操作 boolean success transactionService.executeBusiness(msg); if (success) { // 本地事务成功提交消息 transactionStateMap.put(transactionId, LocalTransactionState.COMMIT_MESSAGE); System.out.println(本地事务执行成功准备提交消息); return LocalTransactionState.COMMIT_MESSAGE; } else { // 本地事务失败回滚消息 transactionStateMap.put(transactionId, LocalTransactionState.ROLLBACK_MESSAGE); System.out.println(本地事务执行失败准备回滚消息); return LocalTransactionState.ROLLBACK_MESSAGE; } } catch (Exception e) { // 执行异常标记为未知状态等待回查 transactionStateMap.put(transactionId, LocalTransactionState.UNKNOW); System.out.println(本地事务执行异常标记为未知状态: e.getMessage()); return LocalTransactionState.UNKNOW; } } /** * 第二阶段事务状态回查 * 如果第一阶段返回UNKNOW或者Broker未收到确认会触发回查 */ Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { String transactionId msg.getUserProperty(TRANSACTION_ID); System.out.println(事务状态回查事务ID: transactionId); // 1. 从存储中查询事务状态 LocalTransactionState cachedState transactionStateMap.get(transactionId); if (cachedState ! null cachedState ! LocalTransactionState.UNKNOW) { // 状态已明确直接返回 return cachedState; } // 2. 主动查询本地事务状态 try { boolean isCommitted transactionService .checkTransactionStatus(transactionId); if (isCommitted) { transactionStateMap.put(transactionId, LocalTransactionState.COMMIT_MESSAGE); return LocalTransactionState.COMMIT_MESSAGE; } else { transactionStateMap.put(transactionId, LocalTransactionState.ROLLBACK_MESSAGE); return LocalTransactionState.ROLLBACK_MESSAGE; } } catch (Exception e) { // 查询失败继续等待下次回查 System.out.println(事务状态回查异常: e.getMessage()); return LocalTransactionState.UNKNOW; } } } // 本地事务服务模拟 public class TransactionService { private final MapString, Boolean transactionStatus new ConcurrentHashMap(); // 执行业务逻辑 public boolean executeBusiness(Message msg) { String transactionId msg.getUserProperty(TRANSACTION_ID); try { // 模拟数据库操作这里是关键 System.out.println(开始执行业务逻辑...); // 1. 业务处理 processBusiness(msg); // 2. 数据库事务提交 commitDatabaseTransaction(transactionId); // 3. 记录事务状态 transactionStatus.put(transactionId, true); System.out.println(业务逻辑执行成功); return true; } catch (Exception e) { // 发生异常回滚数据库事务 rollbackDatabaseTransaction(transactionId); transactionStatus.put(transactionId, false); System.out.println(业务逻辑执行失败: e.getMessage()); return false; } } // 查询事务状态 public boolean checkTransactionStatus(String transactionId) { // 1. 先查缓存 Boolean status transactionStatus.get(transactionId); if (status ! null) { return status; } // 2. 查询数据库实际业务中 return queryDatabaseTransactionStatus(transactionId); } private void processBusiness(Message msg) { // 实际的业务处理逻辑 // 例如订单创建、库存扣减等 } private void commitDatabaseTransaction(String transactionId) { // 数据库事务提交 System.out.println(提交数据库事务: transactionId); } private void rollbackDatabaseTransaction(String transactionId) { // 数据库事务回滚 System.out.println(回滚数据库事务: transactionId); } private boolean queryDatabaseTransactionStatus(String transactionId) { // 查询数据库中的事务状态 System.out.println(查询数据库事务状态: transactionId); return true; // 假设查询成功 } }篇幅限制下面就只能给大家展示小册部分内容了。整理了一份核心面试笔记包括了Java面试、Spring、JVM、MyBatis、Redis、MySQL、并发编程、微服务、Linux、Springboot、SpringCloud、MQ、Kafc需要全套面试笔记及答案【点击此处即可/免费获取】​​​2.2 Broker端实现核心逻辑java复制下载// Broker端的事务消息处理器简化版 public class TransactionalMessageProcessor { // 半消息存储OP队列 private final MessageStore halfMessageStore; // 事务状态存储 private final TransactionStateTable stateTable; // 事务状态回查服务 private final TransactionCheckService checkService; /** * 第一阶段接收半消息 */ public PutMessageResult putHalfMessage(MessageExtBrokerInner message) { // 1. 标记为半消息 message.setTransactionPrepared(true); // 2. 获取事务ID String transactionId message.getProperty(TRANSACTION_ID); // 3. 存储到特殊的半消息队列OP队列 PutMessageResult result halfMessageStore.putMessage(message); if (result.isOk()) { // 4. 记录事务状态初始为PREPARED stateTable.addTransactionState(transactionId, TransactionState.PREPARED, System.currentTimeMillis()); // 5. 开启回查定时任务如果生产者未及时确认 scheduleCheck(transactionId, message); } return result; } /** * 第二阶段处理事务确认 */ public void processTransactionResponse(String transactionId, TransactionState state) { // 1. 更新事务状态 stateTable.updateTransactionState(transactionId, state); // 2. 根据状态处理消息 switch (state) { case COMMIT: // 提交将消息从OP队列移动到真正的Topic commitMessage(transactionId); break; case ROLLBACK: // 回滚删除OP队列中的消息 rollbackMessage(transactionId); break; default: // 保持PREPARED状态 break; } } // 提交消息 private void commitMessage(String transactionId) { // 1. 从OP队列获取半消息 MessageExt halfMessage halfMessageStore.getMessage(transactionId); if (halfMessage ! null) { // 2. 移除半消息标记 halfMessage.setTransactionPrepared(false); // 3. 存储到真正的Topic队列 messageStore.putMessage(halfMessage); // 4. 从OP队列删除 halfMessageStore.deleteMessage(transactionId); System.out.println(事务消息提交成功: transactionId); } } // 回滚消息 private void rollbackMessage(String transactionId) { // 直接从OP队列删除 halfMessageStore.deleteMessage(transactionId); System.out.println(事务消息回滚: transactionId); } // 事务状态回查 private void scheduleCheck(String transactionId, MessageExt message) { // 设置回查延时默认1分钟 long checkDelay message.getCheckImmunityTime(); if (checkDelay 0) { checkDelay 60000; // 默认60秒 } Timer timer new Timer(Transaction-Check-Timer); timer.schedule(new TimerTask() { Override public void run() { // 检查事务状态 TransactionState currentState stateTable.getTransactionState(transactionId); if (currentState TransactionState.PREPARED) { // 状态还是PREPARED触发回查 triggerCheckBack(transactionId, message); } } }, checkDelay); } // 触发回查 private void triggerCheckBack(String transactionId, MessageExt message) { // 1. 构建回查请求 CheckTransactionStateRequestHeader requestHeader new CheckTransactionStateRequestHeader(); requestHeader.setTransactionId(transactionId); requestHeader.setCommitLogOffset(message.getCommitLogOffset()); // 2. 发送回查请求给生产者 checkService.sendCheckRequest(transactionId, requestHeader); } } // 事务状态枚举 public enum TransactionState { PREPARED, // 已准备半消息 COMMIT, // 已提交 ROLLBACK, // 已回滚 UNKNOWN // 未知 } // 事务状态表 public class TransactionStateTable { private final MapString, TransactionStateEntry stateMap new ConcurrentHashMap(); // 事务状态条目 static class TransactionStateEntry { private TransactionState state; private long prepareTime; private long commitTime; private int checkTimes; // 回查次数 public TransactionStateEntry(TransactionState state, long prepareTime) { this.state state; this.prepareTime prepareTime; this.checkTimes 0; } } public void addTransactionState(String transactionId, TransactionState state, long timestamp) { stateMap.put(transactionId, new TransactionStateEntry(state, timestamp)); } public void updateTransactionState(String transactionId, TransactionState newState) { TransactionStateEntry entry stateMap.get(transactionId); if (entry ! null) { entry.state newState; if (newState TransactionState.COMMIT) { entry.commitTime System.currentTimeMillis(); } } } public TransactionState getTransactionState(String transactionId) { TransactionStateEntry entry stateMap.get(transactionId); return entry ! null ? entry.state : null; } }三、完整示例电商订单创建3.1 电商订单事务消息示例java复制下载// 订单服务 public class OrderService { private final TransactionProducer transactionProducer; private final OrderDao orderDao; public OrderService() { this.transactionProducer new TransactionProducer(); this.orderDao new OrderDao(); } // 创建订单使用事务消息 public boolean createOrder(OrderDTO orderDTO) { // 1. 生成订单ID String orderId generateOrderId(); orderDTO.setOrderId(orderId); // 2. 发送事务消息 try { // 构建消息体 OrderMessage orderMessage new OrderMessage(); orderMessage.setOrderId(orderId); orderMessage.setUserId(orderDTO.getUserId()); orderMessage.setAmount(orderDTO.getAmount()); orderMessage.setItems(orderDTO.getItems()); // 发送事务消息 SendResult result transactionProducer.sendTransactionMessage( ORDER_CREATE_TOPIC, CREATE, orderMessage ); System.out.println(订单创建事务消息发送成功: result.getMsgId()); return true; } catch (Exception e) { System.out.println(订单创建失败: e.getMessage()); return false; } } } // 订单事务监听器 public class OrderTransactionListener implements TransactionListener { private final OrderDao orderDao; private final InventoryService inventoryService; private final CouponService couponService; // 存储本地事务执行状态 private final MapString, OrderTransaction transactionMap new ConcurrentHashMap(); public OrderTransactionListener() { this.orderDao new OrderDao(); this.inventoryService new InventoryService(); this.couponService new CouponService(); } Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { String transactionId msg.getProperty(TRANSACTION_ID); String orderId msg.getProperty(ORDER_ID); System.out.println(开始执行订单本地事务订单ID: orderId); try { // 解析消息 OrderMessage orderMessage JSON.parseObject( msg.getBody(), OrderMessage.class); // 开始数据库事务 Connection conn DatabaseUtil.getConnection(); conn.setAutoCommit(false); try { // 1. 创建订单记录 orderDao.createOrder(conn, orderMessage); // 2. 扣减库存 for (OrderItem item : orderMessage.getItems()) { inventoryService.deductInventory( conn, item.getProductId(), item.getQuantity()); } // 3. 使用优惠券 if (orderMessage.getCouponId() ! null) { couponService.useCoupon( conn, orderMessage.getUserId(), orderMessage.getCouponId() ); } // 4. 更新用户积分 updateUserPoints(conn, orderMessage); // 5. 提交数据库事务 conn.commit(); // 记录事务状态 transactionMap.put(transactionId, new OrderTransaction(orderId, true, SUCCESS)); System.out.println(订单本地事务执行成功准备提交消息); return LocalTransactionState.COMMIT_MESSAGE; } catch (Exception e) { // 回滚数据库事务 conn.rollback(); // 记录失败状态 transactionMap.put(transactionId, new OrderTransaction(orderId, false, e.getMessage())); System.out.println(订单本地事务执行失败准备回滚消息); return LocalTransactionState.ROLLBACK_MESSAGE; } finally { conn.close(); } } catch (Exception e) { // 记录异常状态 transactionMap.put(transactionId, new OrderTransaction(orderId, false, EXCEPTION)); System.out.println(订单本地事务执行异常标记为未知状态); return LocalTransactionState.UNKNOW; } } Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { String transactionId msg.getProperty(TRANSACTION_ID); String orderId msg.getProperty(ORDER_ID); System.out.println(订单事务状态回查订单ID: orderId); // 1. 先查缓存 OrderTransaction cached transactionMap.get(transactionId); if (cached ! null) { if (cached.isSuccess()) { return LocalTransactionState.COMMIT_MESSAGE; } else { return LocalTransactionState.ROLLBACK_MESSAGE; } } // 2. 查询数据库订单状态 try { OrderStatus status orderDao.queryOrderStatus(orderId); if (status OrderStatus.CREATED) { // 订单创建成功 transactionMap.put(transactionId, new OrderTransaction(orderId, true, FOUND_IN_DB)); return LocalTransactionState.COMMIT_MESSAGE; } else if (status OrderStatus.FAILED) { // 订单创建失败 transactionMap.put(transactionId, new OrderTransaction(orderId, false, FOUND_IN_DB)); return LocalTransactionState.ROLLBACK_MESSAGE; } else { // 订单不存在或状态未知 return LocalTransactionState.UNKNOW; } } catch (Exception e) { System.out.println(订单状态查询失败: e.getMessage()); return LocalTransactionState.UNKNOW; } } private void updateUserPoints(Connection conn, OrderMessage orderMessage) throws SQLException { // 更新用户积分逻辑 int points calculatePoints(orderMessage.getAmount()); orderDao.updateUserPoints(conn, orderMessage.getUserId(), points); } private int calculatePoints(BigDecimal amount) { // 积分计算规则 return amount.divide(new BigDecimal(10), 0, RoundingMode.DOWN) .intValue(); } } // 订单事务状态记录 class OrderTransaction { private String orderId; private boolean success; private String message; private long timestamp; public OrderTransaction(String orderId, boolean success, String message) { this.orderId orderId; this.success success; this.message message; this.timestamp System.currentTimeMillis(); } // getters and setters }3.2 消费者端实现java复制下载// 订单消息消费者 public class OrderConsumer { private final DefaultMQPushConsumer consumer; public OrderConsumer() { consumer new DefaultMQPushConsumer(OrderConsumerGroup); consumer.setNamesrvAddr(localhost:9876); // 订阅订单创建成功的消息 consumer.subscribe(ORDER_CREATE_TOPIC, *); // 注册消息监听器 consumer.registerMessageListener(new MessageListenerConcurrently() { Override public ConsumeConcurrentlyStatus consumeMessage( ListMessageExt msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { try { // 解析消息 OrderMessage orderMessage JSON.parseObject( msg.getBody(), OrderMessage.class); System.out.println(收到订单创建成功消息: orderMessage.getOrderId()); // 执行业务逻辑如发送通知、更新缓存等 processOrderCreated(orderMessage); } catch (Exception e) { System.out.println(处理订单消息失败: e.getMessage()); // 消费失败稍后重试 return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } // 消费成功 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println(订单消费者启动成功); } private void processOrderCreated(OrderMessage orderMessage) { // 1. 发送订单创建成功通知 sendNotification(orderMessage); // 2. 更新订单缓存 updateOrderCache(orderMessage); // 3. 触发后续业务流程 triggerNextStep(orderMessage); } private void sendNotification(OrderMessage orderMessage) { // 发送邮件、短信通知 System.out.println(发送订单创建成功通知给用户: orderMessage.getUserId()); } private void updateOrderCache(OrderMessage orderMessage) { // 更新Redis缓存 String key order: orderMessage.getOrderId(); // redisTemplate.opsForValue().set(key, orderMessage); System.out.println(更新订单缓存: key); } private void triggerNextStep(OrderMessage orderMessage) { // 触发物流、支付等后续流程 System.out.println(触发订单后续流程: orderMessage.getOrderId()); } }四、高级特性与优化4.1 事务消息的幂等性处理java复制下载// 幂等性处理器 public class IdempotentProcessor { private final CacheString, Long processedMessages; public IdempotentProcessor() { // 使用Guava Cache自动过期 processedMessages CacheBuilder.newBuilder() .maximumSize(10000) .expireAfterWrite(10, TimeUnit.MINUTES) .build(); } /** * 检查消息是否已处理 */ public boolean isProcessed(String messageKey) { return processedMessages.getIfPresent(messageKey) ! null; } /** * 标记消息已处理 */ public void markProcessed(String messageKey) { processedMessages.put(messageKey, System.currentTimeMillis()); } /** * 幂等性消费 */ public ConsumeConcurrentlyStatus consumeWithIdempotent( MessageExt msg, ConsumerFunction function) { // 生成消息唯一标识 String messageKey generateMessageKey(msg); // 检查是否已处理 if (isProcessed(messageKey)) { System.out.println(消息已处理跳过: messageKey); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } try { // 执行业务逻辑 function.process(msg); // 标记已处理 markProcessed(messageKey); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } catch (Exception e) { System.out.println(消费失败: e.getMessage()); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } private String generateMessageKey(MessageExt msg) { // 使用消息ID 业务唯一标识 String orderId msg.getProperty(ORDER_ID); if (orderId ! null) { return msg.getMsgId() _ orderId; } return msg.getMsgId(); } FunctionalInterface public interface ConsumerFunction { void process(MessageExt msg) throws Exception; } }4.2 事务消息的重试机制java复制下载// 事务消息重试管理器 public class TransactionRetryManager { private final ScheduledExecutorService scheduler; private final MapString, RetryTask retryTasks; public TransactionRetryManager() { this.scheduler Executors.newScheduledThreadPool(5); this.retryTasks new ConcurrentHashMap(); } /** * 注册重试任务 */ public void registerRetry(String transactionId, RetryStrategy strategy, RetryCallback callback) { RetryTask task new RetryTask(transactionId, strategy, callback); retryTasks.put(transactionId, task); // 第一次重试延迟 long delay strategy.getDelay(0); scheduler.schedule(task, delay, TimeUnit.MILLISECONDS); } /** * 重试任务 */ class RetryTask implements Runnable { private final String transactionId; private final RetryStrategy strategy; private final RetryCallback callback; private int retryCount 0; public RetryTask(String transactionId, RetryStrategy strategy, RetryCallback callback) { this.transactionId transactionId; this.strategy strategy; this.callback callback; } Override public void run() { try { boolean success callback.retry(); if (success) { // 重试成功移除任务 retryTasks.remove(transactionId); } else if (retryCount strategy.getMaxRetries()) { // 重试失败继续重试 retryCount; long delay strategy.getDelay(retryCount); scheduler.schedule(this, delay, TimeUnit.MILLISECONDS); } else { // 达到最大重试次数放弃 System.out.println(事务重试失败达到最大重试次数: transactionId); retryTasks.remove(transactionId); } } catch (Exception e) { System.out.println(事务重试异常: e.getMessage()); } } } // 重试策略 static class RetryStrategy { private int maxRetries 3; private long initialDelay 1000; // 1秒 private long maxDelay 60000; // 60秒 private double multiplier 2.0; // 指数退避 public long getDelay(int retryCount) { long delay (long) (initialDelay * Math.pow(multiplier, retryCount)); return Math.min(delay, maxDelay); } // getters and setters } FunctionalInterface public interface RetryCallback { boolean retry() throws Exception; } }4.3 事务消息的监控与告警java复制下载// 事务消息监控 RestController public class TransactionMonitorController { Autowired private TransactionStatsCollector statsCollector; GetMapping(/api/transaction/stats) public TransactionStats getTransactionStats() { return statsCollector.collectStats(); } GetMapping(/api/transaction/{id}/status) public TransactionStatus getTransactionStatus(PathVariable String id) { return statsCollector.getTransactionStatus(id); } GetMapping(/api/transaction/alerts) public ListTransactionAlert getActiveAlerts() { return statsCollector.getActiveAlerts(); } } // 事务统计收集器 Component public class TransactionStatsCollector { // 事务计数器 private final AtomicLong totalTransactions new AtomicLong(0); private final AtomicLong successTransactions new AtomicLong(0); private final AtomicLong failedTransactions new AtomicLong(0); private final AtomicLong pendingTransactions new AtomicLong(0); // 耗时统计 private final Histogram transactionDuration; public TransactionStatsCollector() { this.transactionDuration new Histogram( TimeUnit.SECONDS.toMillis(1), TimeUnit.MINUTES.toMillis(5), 3 ); } public void recordTransaction(String transactionId, boolean success, long duration) { totalTransactions.incrementAndGet(); if (success) { successTransactions.incrementAndGet(); } else { failedTransactions.incrementAndGet(); } transactionDuration.record(duration); // 记录到日志 logTransaction(transactionId, success, duration); } public TransactionStats collectStats() { TransactionStats stats new TransactionStats(); stats.setTotalTransactions(totalTransactions.get()); stats.setSuccessTransactions(successTransactions.get()); stats.setFailedTransactions(failedTransactions.get()); stats.setPendingTransactions(pendingTransactions.get()); stats.setSuccessRate(calculateSuccessRate()); stats.setAverageDuration(transactionDuration.getMean()); stats.setP95Duration(transactionDuration.getValue(0.95)); stats.setP99Duration(transactionDuration.getValue(0.99)); return stats; } private double calculateSuccessRate() { long total totalTransactions.get(); if (total 0) { return 0.0; } return (double) successTransactions.get() / total * 100; } private void logTransaction(String transactionId, boolean success, long duration) { // 记录到日志系统 String log String.format( Transaction: id%s, success%s, duration%dms, transactionId, success, duration ); System.out.println(log); } } // 自定义Histogram实现简化 class Histogram { private final long[] buckets; private final long[] counts; private long totalCount; private double sum; public Histogram(long min, long max, int bucketCount) { this.buckets new long[bucketCount 1]; this.counts new long[bucketCount]; long step (max - min) / bucketCount; for (int i 0; i bucketCount; i) { buckets[i] min i * step; } } public void record(long value) { totalCount; sum value; for (int i 0; i buckets.length - 1; i) { if (value buckets[i] value buckets[i 1]) { counts[i]; break; } } } public double getMean() { return totalCount 0 ? sum / totalCount : 0; } public long getValue(double percentile) { if (totalCount 0) { return 0; } long target (long) (totalCount * percentile); long accumulated 0; for (int i 0; i counts.length; i) { accumulated counts[i]; if (accumulated target) { return buckets[i 1]; } } return buckets[buckets.length - 1]; } }篇幅限制下面就只能给大家展示小册部分内容了。整理了一份核心面试笔记包括了Java面试、Spring、JVM、MyBatis、Redis、MySQL、并发编程、微服务、Linux、Springboot、SpringCloud、MQ、Kafc需要全套面试笔记及答案【点击此处即可/免费获取】​​​五、常见问题与解决方案5.1 事务消息的最终一致性java复制下载// 最终一致性保障 public class EventualConsistencyGuarantee { /** * 方法1消息重试 死信队列 */ public void guaranteeByRetryAndDLQ() { // 1. 正常消费 // 2. 消费失败 - 重试N次 // 3. 仍然失败 - 进入死信队列 // 4. 人工介入处理死信 } /** * 方法2本地消息表 */ public void guaranteeByLocalMessageTable() { // 1. 业务操作 消息记录存入本地数据库同一事务 // 2. 定时任务扫描未发送消息 // 3. 发送到MQ // 4. 更新发送状态 } /** * 方法3最大努力通知 */ public void guaranteeByBestEffort() { // 1. 业务操作完成 // 2. 发送通知消息允许失败 // 3. 定时重试通知 // 4. 达到最大重试次数后记录日志 } }5.2 事务消息性能优化java复制下载// 事务消息性能优化 public class TransactionPerformanceOptimizer { // 批量发送半消息 public ListSendResult sendHalfMessagesInBatch( ListMessage messages) throws Exception { // 1. 批量发送半消息 SendResult batchResult producer.send(messages); // 2. 批量执行本地事务 ListLocalTransactionState states executeLocalTransactionsInBatch(messages); // 3. 批量提交事务状态 batchEndTransaction(batchResult, states); // 4. 返回结果 return extractSendResults(batchResult); } // 异步执行本地事务 public CompletableFutureLocalTransactionState executeLocalTransactionAsync(Message msg) { return CompletableFuture.supplyAsync(() - { try { return transactionListener.executeLocalTransaction(msg, null); } catch (Exception e) { return LocalTransactionState.UNKNOW; } }); } // 状态缓存优化 public class TransactionStateCache { private final CacheString, LocalTransactionState cache; public TransactionStateCache() { this.cache CacheBuilder.newBuilder() .maximumSize(10000) .expireAfterWrite(10, TimeUnit.MINUTES) .recordStats() // 记录统计信息 .build(); } public LocalTransactionState get(String transactionId) { return cache.getIfPresent(transactionId); } public void put(String transactionId, LocalTransactionState state) { cache.put(transactionId, state); } // 获取缓存命中率 public double getHitRate() { CacheStats stats cache.stats(); return stats.hitRate(); } } }六、总结6.1 事务消息的核心优势强一致性保证本地事务和消息发送的原子性最终一致性跨系统数据一致性解耦业务系统与消息系统解耦可靠性消息不丢失支持重试6.2 最佳实践合理设置回查时间根据业务耗时设置保证本地事务幂等性防止重复执行监控告警及时发现处理异常性能优化批量、异步、缓存6.3 注意事项事务消息会增加系统复杂性需要处理消息积压问题考虑网络分区时的容错做好数据一致性的验证RocketMQ的事务消息二阶段提交实现通过半消息、本地事务执行、事务状态回查等机制实现了分布式系统中的数据一致性保障是处理分布式事务的有效方案。

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询