2026/4/4 13:03:48
网站建设
项目流程
网站设计的开发工具和环境,网红营销分析,网站建设-广州迅优公司,天津建设工程信息网官罿本文我们来梳理 Flink 的线程模型——Mailbox。
写在前面
在以前的线程模型中#xff0c;Flink 通过 checkpointLock 来隔离保证不同线程在修改内部状态时的正确性。通过 checkpointLock 控制并发会在代码中出现大量的 synchronize(lock) 这样非常不利于阅读和调试。Flink 也提…本文我们来梳理 Flink 的线程模型——Mailbox。写在前面在以前的线程模型中Flink 通过 checkpointLock 来隔离保证不同线程在修改内部状态时的正确性。通过 checkpointLock 控制并发会在代码中出现大量的synchronize(lock)这样非常不利于阅读和调试。Flink 也提供了一些 API 将锁对象暴露给用户如果没有正确使用锁很容易导致线程安全问题。为了解决这些问题Flink 社区提出了基于 Mailbox 的线程模型。它是通过单线程加阻塞队列来实现。这样内部状态的修改就由单线程来完成了。旧的线程模型中checkpointLock 主要用在三个地方Event Process包括 event、watermark、barrier 的处理和发送Checkpoint包括 Checkpoint 的触发和完成通知ProcessTime TimerProcessTime 的回调通常涉及对状态的修改在 Mailbox 模型中将所有需要处理的事件都封装成 Mail 投递到 Mailbox 中然后由单线程按照顺序处理。相关定义下面我们来看 Mailbox 的具体实现具体涉及到以下这些类。我们来逐个看一下这些类的定义和作用。Mail在 Mailbox 线程模型中Mail 是最基础的一个类它用来封装需要处理的消息和执行的动作。Checkpoint Trigger 和 ProcessTime Trigger 都是通过 Mail 来触发的。Mail 中包含以下属性// 选项包括两个选项isUrgent 和 deferrableprivatefinalMailOptionsImplmailOptions;// 要执行的动作privatefinalThrowingRunnable?extendsExceptionrunnable;// 优先级这里的优先级不决定执行顺序而是避免上下游之间的死锁问题privatefinalintpriority;// 描述信息privatefinalStringdescriptionFormat;privatefinalObject[]descriptionArgs;// 用于执行 runnable 的执行器privatefinalStreamTaskActionExecutoractionExecutor;TaskMailbox有了 Mail 之后Flink 用 TaskMailbox 来存储它在需要执行时再从 TaskMailbox 中取出。具体的处理逻辑在 TaskMailboxImpl 中。// 内部对于 queue 和 state 的并发访问都需要被这个锁保护privatefinalReentrantLocklocknewReentrantLock();// 实际存储 Mail 的队列GuardedBy(lock)privatefinalDequeMailqueuenewArrayDeque();// 与 lock 关联的 Condition主要用于队列从空变为非空时唤醒等待获取 Mail 的线程GuardedBy(lock)privatefinalConditionnotEmptylock.newCondition();// 状态包括 OPEN/QUIESCED/CLOSEDGuardedBy(lock)privateStatestateOPEN;// 指定的邮箱线程的引用NonnullprivatefinalThreadtaskMailboxThread;// 用于性能优化的设计privatefinalDequeMailbatchnewArrayDeque();// queue队列是否为空用于性能优化避免频繁访问主队列privatevolatilebooleanhasNewMailfalse;// 是否有紧急邮件同样用于性能优化减少检查队列中是否有紧急邮件的次数privatevolatilebooleanhasNewUrgentMailfalse;通过上面的属性我们知道 TaskMailbox 底层是用 ArrayDeque 来存储 Mail 的它内部包含了一个状态字段 statestate 的状态包括OPEN可以正常访问接收和发送 Mail。QUIESCED处于静默状态不接收新的 Mail已有的 Mail 仍然可以被取出。CLOSED关闭状态不能进行任何操作。在 TaskMailbox 内部并发访问 queue 队列和 state 状态都需要 lock 这个锁的保护。此外 TaskMailbox 内部还做了一些性能优化比如增加了 batch 队列在处理 Mail 时先将一批 Mail 从 queue 队列转移到 batch之后会优先从 batch 队列中取这样就减少了访问 queue 队列的次数缓解了锁竞争压力。MailboxProcessorMailboxProcessor 可以认为是 Mailbox 相关的核心入口MailboxProcessor 的核心方法就是事件循环这个循环中主要是从 TaskMailbox 中取出 Mail 执行相应动作和执行默认动作MailboxDefaultAction。MailboxProcessor 还对外提供了 MailboxExecutor其他组件可以利用 MailboxExecutor 来提交事件。MailboxExecutor我们接着来看 MailboxExecutor它的实现类是 MailboxExecutorImpl。包括以下属性// 实际存储的 mailbox 实例NonnullprivatefinalTaskMailboxmailbox;// 优先级MailboxExecutor 提供的默认优先级提交 mail 时会带上这个字段privatefinalintpriority;// 执行器运行 mail 的动作privatefinalStreamTaskActionExecutoractionExecutor;// 执行 MailboxProcessor主要用于 isIdle 方法privatefinalMailboxProcessormailboxProcessor;MailboxExecutor 的主要作用是向 TaskMailbox 中投递 mail核心方法是 execute。这个方法可以在任意线程中执行因为 mailbox 内部控制了并发。publicvoidexecute(MailOptionsmailOptions,finalThrowingRunnable?extendsExceptioncommand,finalStringdescriptionFormat,finalObject...descriptionArgs){try{mailbox.put(newMail(mailOptions,command,priority,actionExecutor,descriptionFormat,descriptionArgs));}catch(MailboxClosedExceptionmbex){thrownewRejectedExecutionException(mbex);}}除了 execute 方法以外MailboxExecutor 中还有一个重要的方法就是 yield。publicvoidyield()throwsInterruptedException{Mailmailmailbox.take(priority);try{mail.run();}catch(Exceptionex){throwWrappingRuntimeException.wrapIfNecessary(ex);}}这个方法的主要目的是为了让出对当前事件的处理。这么做的原因有二如果不考虑优先级的因素Mailbox 队列是 FIFO 的顺序处理如果当前事件依赖后面的事件完成则有可能造成”死锁“。当前事件处理事件较长会阻塞其他事件。因此需要让出执行权让相同或更高优先级的事件有机会执行。需要注意的是 yield 方法只能有 mailbox 线程自身调用。另外Flink 也提供了非阻塞版本的方法就是 tryYield。执行流程主流程在创建 StreamTask 时会创建 mailboxProcessor同时也会持有 mainMailboxExecutor。newTaskMailboxImpl(Thread.currentThread()));...this.mailboxProcessornewMailboxProcessor(this::processInput,mailbox,actionExecutor,mailboxMetricsControl);...this.mainMailboxExecutormailboxProcessor.getMainMailboxExecutor();可以看到这里将 processInput 作为 MailboxDefaultAction 传入 MailboxProcessor。在 StreamTask 启动时会调用 MailboxProcessor 的核心方法。publicfinalvoidinvoke()throwsException{// Allow invoking method invoke without having to call restore before it.if(!isRunning){LOG.debug(Restoring during invoke will be called.);restoreInternal();}// final check to exit early before starting to runensureNotCanceled();scheduleBufferDebloater();// let the task do its workgetEnvironment().getMetricGroup().getIOMetricGroup().markTaskStart();runMailboxLoop();// if this left the run() method cleanly despite the fact that this was canceled,// make sure the clean shutdown is not attemptedensureNotCanceled();afterInvoke();}publicvoidrunMailboxLoop()throwsException{mailboxProcessor.runMailboxLoop();}runMailboxLoop 的核心逻辑是一个 while 循环在循环中处理 mail 并执行默认动作。publicvoidrunMailboxLoop()throwsException{suspended!mailboxLoopRunning;finalTaskMailboxlocalMailboxmailbox;checkState(localMailbox.isMailboxThread(),Method must be executed by declared mailbox thread!);assertlocalMailbox.getState()TaskMailbox.State.OPEN:Mailbox must be opened!;finalMailboxControllermailboxControllernewMailboxController(this);while(isNextLoopPossible()){// The blocking processMail call will not return until default action is available.processMail(localMailbox,false);if(isNextLoopPossible()){mailboxDefaultAction.runDefaultAction(mailboxController);// lock is acquired inside default action as needed}}}privatebooleanisNextLoopPossible(){// Suspended can be false only when mailboxLoopRunning is true.return!suspended;}首先是做了前置检查包括确保 TaskMailbox 是指定的 mailbox 线程TaskMailbox 的状态是 OPEN。接着创建了 MailboxController它用于 MailboxDefaultAction 与 MailboxProcessor 的交互。然后就进入到while (isNextLoopPossible())循环了循环中调用了 processMail在这个方法中对 mail 进行处理。privatebooleanprocessMail(TaskMailboxmailbox,booleansingleStep)throwsException{// Doing this check is an optimization to only have a volatile read in the expected hot// path, locks are only// acquired after this point.booleanisBatchAvailablemailbox.createBatch();// Take mails in a non-blockingly and execute them.booleanprocessedisBatchAvailableprocessMailsNonBlocking(singleStep);if(singleStep){returnprocessed;}// If the default action is currently not available, we can run a blocking mailbox execution// until the default action becomes available again.processed|processMailsWhenDefaultActionUnavailable();returnprocessed;}processMail 方法中先创建 batch然后非阻塞的处理这批 mail。privatebooleanprocessMailsNonBlocking(booleansingleStep)throwsException{longprocessedMails0;OptionalMailmaybeMail;while(isNextLoopPossible()(maybeMailmailbox.tryTakeFromBatch()).isPresent()){if(processedMails0){maybePauseIdleTimer();}runMail(maybeMail.get());if(singleStep){break;}}if(processedMails0){maybeRestartIdleTimer();returntrue;}else{returnfalse;}}privatevoidrunMail(Mailmail)throwsException{mailboxMetricsControl.getMailCounter().inc();mail.run();if(!suspended){// start latency measurement on first mail that is not suspending mailbox execution,// i.e., on first non-poison mail, otherwise latency measurement is not started to avoid// overheadif(!mailboxMetricsControl.isLatencyMeasurementStarted()mailboxMetricsControl.isLatencyMeasurementSetup()){mailboxMetricsControl.startLatencyMeasurement();}}}processMailsNonBlocking 直接调用 runMail 方法最终是调用mail.run执行具体动作。processMailsWhenDefaultActionUnavailable 的逻辑是如果当前默认动作不可用会接着调用 runMail 尝试处理 Mail这里会阻塞的等待直到有新的需要处理的 Mail 或者默认动作可用。当默认动作可用时就会执行默认动作也就是Stream.processInput这里就是处理 StreamRecord 了。protectedvoidprocessInput(MailboxDefaultAction.Controllercontroller)throwsException{DataInputStatusstatusinputProcessor.processInput();switch(status){caseMORE_AVAILABLE:if(taskIsAvailable()){return;}break;caseNOTHING_AVAILABLE:break;caseEND_OF_RECOVERY:thrownewIllegalStateException(We should not receive this event here.);caseSTOPPED:endData(StopMode.NO_DRAIN);return;caseEND_OF_DATA:endData(StopMode.DRAIN);notifyEndOfData();return;caseEND_OF_INPUT:// Suspend the mailbox processor, it would be resumed in afterInvoke and finished// after all records processed by the downstream tasks. We also suspend the default// actions to avoid repeat executing the empty default operation (namely process// records).controller.suspendDefaultAction();mailboxProcessor.suspend();return;}...}当 status 是 MORE_AVAILABLE表示还有更多数据可用立即处理判断当前任务可用就立即返回。当 status 是 END_OF_INPUT 时表示所有的输入都结束了这时就会暂停循环事件的调用。Checkpoint 流程触发 Checkpoint 的流程是调用Stream.triggerCheckpointAsync方法。publicCompletableFutureBooleantriggerCheckpointAsync(CheckpointMetaDatacheckpointMetaData,CheckpointOptionscheckpointOptions){checkForcedFullSnapshotSupport(checkpointOptions);MailboxExecutor.MailOptionsmailOptionsCheckpointOptions.AlignmentType.UNALIGNEDcheckpointOptions.getAlignment()?MailboxExecutor.MailOptions.urgent():MailboxExecutor.MailOptions.options();CompletableFutureBooleanresultnewCompletableFuture();mainMailboxExecutor.execute(mailOptions,()-{try{booleannoUnfinishedInputGatesArrays.stream(getEnvironment().getAllInputGates()).allMatch(InputGate::isFinished);if(noUnfinishedInputGates){result.complete(triggerCheckpointAsyncInMailbox(checkpointMetaData,checkpointOptions));}else{result.complete(triggerUnfinishedChannelsCheckpoint(checkpointMetaData,checkpointOptions));}}catch(Exceptionex){// Report the failure both via the Future result but also to the mailboxresult.completeExceptionally(ex);throwex;}},checkpoint %s with %s,checkpointMetaData,checkpointOptions);returnresult;}通过调用mainMailboxExecutor.execute方法来向 Mailbox 中提交 Mail。Checkpoint 完成的通知也是一样放在 Mailbox 中执行的不过这里提交的是一个高优先级的操作。privateFutureVoidnotifyCheckpointOperation(RunnableWithExceptionrunnable,Stringdescription){CompletableFutureVoidresultnewCompletableFuture();mailboxProcessor.getMailboxExecutor(TaskMailbox.MAX_PRIORITY).execute(()-{try{runnable.run();}catch(Exceptionex){result.completeExceptionally(ex);throwex;}result.complete(null);},description);returnresult;}总结本文我们梳理了 Mailbox 相关的源码。Flink 通过 Mailbox 线程模型来简化相关代码逻辑。我们先了解了几个核心类Mail、TaskMailbox、MailboxProcessor、MailboxExecutor。然后梳理了具体的事件处理和触发的流程。