2026/4/22 19:18:50
网站建设
项目流程
网站建设 橙,怎样做微网站,建设信用卡银行积分商城网站,重庆网站优化公司怎么样分布式 SAGA 模式全解与 Java 入门示例术语更正#xff1a;本文讨论的是分布式事务的 SAGA 模式#xff08;非“sage”#xff09;。SAGA 通过将一个跨服务的长事务拆分为多个本地事务#xff0c;并在失败时按逆序执行补偿事务#xff0c;实现最终一致性。它特别适合长事务…分布式 SAGA 模式全解与 Java 入门示例术语更正本文讨论的是分布式事务的SAGA 模式非“sage”。SAGA 通过将一个跨服务的长事务拆分为多个本地事务并在失败时按逆序执行补偿事务实现最终一致性。它特别适合长事务、复杂流程、可接受短暂中间状态的业务场景如电商下单全流程、物流履约、金融审批等。一、核心概念与适用场景核心思想将一个全局事务拆分为有序的本地事务链LT1 → LT2 → … → LTn。每个 LT 成功后立即提交释放资源、无全局锁并生成对应的补偿事务 CTi用于撤销影响。任意 LT 失败时按逆序执行已成功步骤的补偿CTn → … → CT1使数据回到一致状态。关键角色事务发起者 Initiator触发 SAGA。参与者 Participant执行本地事务与补偿事务的服务。协调器 Coordinator维护全局状态、推进流程、失败回滚编排式/协同式。适用场景长事务/长时间等待如用户支付、物流运输。多服务串行/并行的复杂流程。低侵入改造需求相比 TCC 少接口改造只需新增补偿。可接受最终一致性而非强一致。与其他方案对比简表方案一致性性能业务侵入典型场景2PC/3PC强一致低低依赖 XA短事务、强一致核心转账TCC最终一致高高Try/Confirm/Cancel短事务、高并发、多资源SAGA最终一致中高低新增补偿长事务、复杂流程本地消息表最终一致高低异步通知、简单流程核心挑战补偿逻辑精准性有些操作不可逆需要替代补偿如退款/召回。幂等性网络重试导致重复执行。并发冲突同一资源多 SAGA 并发修改。中间状态可见性/隔离性需通过状态标记、版本号、业务规则缓解。二、两种实现模式图解与对比编排式Choreography去中心化每个参与者通过事件/消息驱动下一步失败则广播补偿。优点无单点、耦合低缺点流程分散、全局状态难追踪、易循环依赖。协同式Orchestration中心化协调器统一定义流程与回滚顺序依次调用参与者失败按逆序补偿。优点流程集中、易维护与观测缺点协调器单点风险需高可用。示意时序简化编排式 LT1→发“T1成功”→LT2→发“T2成功”→LT3 若 LT2 失败→发“T2失败”→LT1 执行 CT1 协同式 协调器→LT1→LT2→LT3 若 LT2 失败→协调器→CT2→CT1选型建议≤3 步的简单流程编排式实现更快。多步骤/多分支/需可视化编排协同式更稳。三、Java 极简示例 协调式 SAGA无框架目标模拟“扣款 → 扣库存”失败则“恢复库存 → 冲正扣款”。强调幂等与防悬挂。领域与幂等键publicclassSagaContext{publicfinalStringsagaIdjava.util.UUID.randomUUID().toString();publicfinalStringbusinessKeyorder-1001;// 可扩展超时时间、重试次数、状态等}事务步骤接口publicinterfaceSagaStep{// 正向本地事务true成功false失败booleanexecute(SagaContextctx);// 补偿事务true补偿成功false需重试/告警booleancompensate(SagaContextctx);}两个参与者示例importjava.math.BigDecimal;importjava.util.concurrent.ConcurrentHashMap;importjava.util.concurrent.atomic.AtomicInteger;publicclassAccountStepimplementsSagaStep{// 模拟账户余额生产请用 DBprivatestaticfinalConcurrentHashMapString,BigDecimalBALANCEnewConcurrentHashMap();// 幂等与防悬挂sagaId - 已执行动作避免重复执行/补偿后正向再执行privatestaticfinalConcurrentHashMapString,StringEXEC_LOGnewConcurrentHashMap();static{BALANCE.put(A001,newBigDecimal(1000));}Overridepublicbooleanexecute(SagaContextctx){StringdoneEXEC_LOG.putIfAbsent(ctx.sagaId:minus,1);if(done!null)returntrue;// 幂等已执行过BigDecimalcurBALANCE.get(A001);if(cur.compareTo(newBigDecimal(100))0)returnfalse;BALANCE.put(A001,cur.subtract(newBigDecimal(100)));System.out.printf([Account] 扣款成功余额%ssagaId%s%n,BALANCE.get(A001),ctx.sagaId);returntrue;}Overridepublicbooleancompensate(SagaContextctx){// 防悬挂若正向未执行过也要记录补偿痕迹避免正向后补执行StringpendEXEC_LOG.putIfAbsent(ctx.sagaId:compMinus,1);if(1.equals(pend)){System.out.printf([Account] 补偿已记录或执行过sagaId%s%n,ctx.sagaId);returntrue;}BigDecimalcurBALANCE.get(A001);BALANCE.put(A001,cur.add(newBigDecimal(100)));System.out.printf([Account] 冲正成功余额%ssagaId%s%n,BALANCE.get(A001),ctx.sagaId);returntrue;}}publicclassInventoryStepimplementsSagaStep{// 模拟库存生产请用 DBprivatestaticfinalConcurrentHashMapString,AtomicIntegerSTOCKnewConcurrentHashMap();privatestaticfinalConcurrentHashMapString,StringEXEC_LOGnewConcurrentHashMap();static{STOCK.put(P100,newAtomicInteger(10));}Overridepublicbooleanexecute(SagaContextctx){StringdoneEXEC_LOG.putIfAbsent(ctx.sagaId:deduct,1);if(done!null)returntrue;AtomicIntegersSTOCK.get(P100);if(s.decrementAndGet()0){// 回滚本地变更演示用生产需事务内操作s.incrementAndGet();returnfalse;}System.out.printf([Inventory] 扣减库存成功库存%dsagaId%s%n,s.get(),ctx.sagaId);returntrue;}Overridepublicbooleancompensate(SagaContextctx){StringpendEXEC_LOG.putIfAbsent(ctx.sagaId:compDeduct,1);if(1.equals(pend)){System.out.printf([Inventory] 补偿已记录或执行过sagaId%s%n,ctx.sagaId);returntrue;}STOCK.get(P100).incrementAndGet();System.out.printf([Inventory] 恢复库存成功库存%dsagaId%s%n,STOCK.get(P100).get(),ctx.sagaId);returntrue;}}协调器与回滚importjava.util.ArrayList;importjava.util.List;publicclassSagaCoordinator{privatefinalListSagaStepstepsnewArrayList();publicSagaCoordinatoraddStep(SagaStepstep){steps.add(step);returnthis;}publicvoidexecute(SagaContextctx){ListIntegerdonenewArrayList();try{for(inti0;isteps.size();i){if(!steps.get(i).execute(ctx)){thrownewRuntimeException(步骤[i]执行失败触发回滚);}done.add(i);}System.out.printf([Saga] 执行成功sagaId%s%n,ctx.sagaId);}catch(Exceptionex){System.out.printf([Saga] 执行失败开始补偿sagaId%s原因%s%n,ctx.sagaId,ex.getMessage());// 逆序补偿for(intidone.size()-1;i0;i--){booleancompOksteps.get(i).compensate(ctx);if(!compOk){System.err.printf([Saga] 补偿步骤[%d]失败需人工介入sagaId%s%n,i,ctx.sagaId);}}}}publicstaticvoidmain(String[]args){SagaContextctxnewSagaContext();newSagaCoordinator().addStep(newAccountStep()).addStep(newInventoryStep()).execute(ctx);}}运行与验证正常库存充足时输出余额900、库存9。异常将库存初始改为0会触发“扣库存失败 → 恢复库存 → 冲正扣款”余额回到1000、库存10。关键点幂等通过ConcurrentHashMap.putIfAbsent记录已执行动作。防悬挂补偿先写日志避免补偿后再执行正向。无全局锁每个步骤本地事务提交提升吞吐。 关注公众号【云技纵横】目前正在更新分布式缓存进阶技巧和干货四、生产落地要点与框架选型幂等与去重为每个 SAGA 分配全局唯一事务IDsagaId在参与者的本地表中记录“动作类型状态业务键”用唯一索引/版本号保证幂等。可靠消息与“发件箱”模式协调器/参与者更新本地事务后将事件写入本地发件箱表再由转发器可靠投递到 MQ确保“状态变更与事件发送”原子性。超时、重试与死信队列对可重试异常使用指数退避与最大重试次数多次失败入DLQ并告警人工介入。并发与隔离通过语义锁/版本号/交换式更新/重读值等策略降低脏写风险必要时采用业务排队或分区锁。协调器高可用协同式需做主从/集群、持久化状态、故障转移与可观测性指标/日志/追踪。框架选型建议Seata SAGA基于状态机引擎编排支持条件选择、并发、子流程、参数映射、重试/捕获、补偿触发等适合复杂流程与可视化编排。阿里云 SOFABoot Saga提供参与者开发范式与防悬挂等工程化实践适合金融级场景。五、常见问题快速排查清单补偿重复执行导致“多退/多冲正”检查补偿接口幂等键sagaIdaction使用状态机或去重表拦截重复补偿。补偿失败或一直重试入DLQ、触发告警、提供管理端重试/跳过必要时人工介入。正向在补偿后“后发先至”悬挂在补偿成功时写入已补偿标记正向执行前先校验若已补偿则直接失败。并发扣减同一资源出现“负库存/错账”使用版本号/条件更新或分区串行化结合语义锁降低冲突窗口。流程变更难维护采用状态机编排集中管理流程变更只需改状态图/DSL降低耦合与回归成本。