2026/2/21 13:54:05
网站建设
项目流程
请给自己的网站首页布局,wordpress 如何做门户,工程公司名称,网页设计什么专业能学第一章#xff1a;那只准时敲门的“幽灵”——Checkpoint与其背后的IO风暴我们拿到的是一个极其诡异的现场#xff1a;每30分钟一次#xff0c;持续5分钟的反压。这不像是因为数据倾斜导致的“长尾”#xff0c;也不像代码逻辑死循环导致的“猝死”。它太规律了#xff0c…第一章那只准时敲门的“幽灵”——Checkpoint与其背后的IO风暴我们拿到的是一个极其诡异的现场每30分钟一次持续5分钟的反压。这不像是因为数据倾斜导致的“长尾”也不像代码逻辑死循环导致的“猝死”。它太规律了规律得像是一个精心设计的定时炸弹。在排除了业务侧的“整点秒杀”或“批量导数”后我们的目光必须从业务逻辑层下沉到Flink的Runtime以及更底层的OS层。很多人看到反压Backpressure第一反应是看inPoolUsage看Stack Trace。错了。对于这种周期性故障你首先要看的是时间轴的重合度。1.1 嫌疑人XCheckpoint的“对齐”诅咒你可能会说“我的Checkpoint设的是3分钟一次跟30分钟有什么关系”别急。让我们把Flink Web UI的Checkpoint History拉出来别只看最近的几次我要你拉过去24小时的。你需要关注的不是“Completed”而是那些Duration突然变长或者Alignment Time突然飙升的点。假设你发现了Checkpoint的时间虽然是3分钟一次但每隔10次左右约30分钟就会出现一次耗时极长的Checkpoint比如从10秒飙升到4分钟。这时候Barrier对齐Barrier Alignment就是最大的嫌疑人。在EXACTLY_ONCE语义下Flink算子需要等待所有上游通道的Barrier到齐。如果每30分钟HDFS或S3/OSS作为State Backend的存储端出现了一次写入延迟抖动或者你的网络带宽在那一刻被某个“邻居”挤占了Checkpoint的同步阶段Sync Phase就会被无限拉长。排查实操别盯着end_to_end_duration发呆那个指标太宏观没用。去Metrics里找这两个关键指标的P99和Max值checkpoint_alignment_time如果这个值周期性飙升说明数据流中有“慢车”。Barrier被卡在了某个Channel里。checkpoint_start_delay这个更隐蔽。如果这个值高说明系统在Checkpoint还没开始就卡住了通常是因为Checkpoint Lock拿不到或者Task Thread在处理一个巨大的Record根本没空响应Barrier。老鸟笔记很多时候周期性的start_delay飙升是因为你的Source端在每30分钟进行一次Partition Discovery分区发现或者元数据刷新这个操作是同步阻塞的直接导致Barrier发不出来。如果你发现Checkpoint的周期和反压周期完美重合或者反压发生在Checkpoint期间那么恭喜你范围缩小了。但这只是表象。为什么Checkpoint会变慢1.2 RocksDB的“Compaction风暴”这才是重头戏。绝大多数生产环境的大状态作业都跑在RocksDB上。RocksDB是基于LSM Tree的它的写入是极快的Append Only但读取和清理是昂贵的。你有没有配置过这个参数state.backend.rocksdb.compaction.style如果是默认的LEVEL模式RocksDB会把数据分成L0, L1, ... Ln层。当L0层满了通常很快它会触发Compaction合并压缩把数据Merge到L1。这个过程还好。但随着时间推移数据会像滚雪球一样推向高层级。高能预警当数据从L3合并到L4或者L4到L5时涉及的磁盘IO量是指数级爆炸的。如果你的业务场景中State的TTL过期时间设置得不凑巧或者你的写入量正好在30分钟左右填满了一个较大的Level阈值就会触发一次Major Compaction。这就是那“5分钟”的真相。在Compaction发生时尽管RocksDB宣称是后台线程在跑但它会疯狂抢占磁盘IO和CPU。如果你的TaskManager所在的机器磁盘IOPS比如云盘被打满Checkpoint的fsync操作就会被阻塞。一旦Checkpoint卡住反压立刻顺着算子链往上传导。验证手段不看即使你把眼珠子瞪出来也找不到原因开启RocksDB Native Metrics在flink-conf.yaml里加上state.backend.rocksdb.metrics.actual-delayed-write-rate: truestate.backend.rocksdb.metrics.background-errors: truestate.backend.rocksdb.metrics.num-running-compactions: true观察Grafana监控盯着rocksdb_num_running_compactions和rocksdb_background_errors。 如果这根曲线呈现出锯齿状并且波峰正好对应你反压的30分钟周期案子就破了。IO Wait指标切到机器监控Node Exporter看iowaitcpu usage。如果那5分钟里iowait从1%飙到了30%以上这就是典型的磁盘瓶颈。解决策略这里的坑很深。千万别直接把state.backend.rocksdb.thread.num调大那只会让IO死得更快。试着换成Universal Compaction风格state.backend.rocksdb.compaction.style: UNIVERSAL它对写更友好虽然读性能稍差但能避免周期性的IO尖峰。增大Write Buffer让更多数据在内存里合并减少落盘频次。第二章JVM的“定期大扫除”——GC STW的隐秘角落如果RocksDB和Checkpoint看起来都岁月静好IO也没有波澜那我们就得把听诊器贴到JVM的心脏上。30分钟一次持续5分钟。这个时间跨度对于一次普通的Full GC来说太长了除非你的堆有几百G且配置极烂但对于内存泄漏导致的频繁GC爆发期或者是G1 GC的Mixed GC周期是有可能的。2.1 堆外内存的“慢性中毒”Flink不仅用堆内Heap还大量使用堆外Off-Heap/Direct内存特别是Netty的网络传输Buffer。想象这样一个场景你的代码里用了一个第三方Client比如去查Redis或HBase你以为你把它关了但其实每次连接都泄露了一个DirectByteBuffer。 前25分钟内存池慢慢涨JVM觉得“我还行我能扛”。 到了第29分钟Direct Memory逼近-XX:MaxDirectMemorySize。 JVM慌了。它开始拼命调用System.gc()来回收堆外内存注意堆外内存的回收往往依赖堆内对象的finalize或Cleaner机制这需要触发Full GC。于是你看到了连续5分钟的“GC地狱模式”。CPU利用率飙升但全在做GCTask线程根本拿不到时间片处理数据导致反压。怎么抓现行别只看Heap Usage。在Grafana上把以下几个指标叠在一起看GarbageCollection Count (Old Gen / Full GC)GarbageCollection TimeBufferPool Direct Memory Used(这是关键)如果每30分钟Direct Memory Usage达到顶峰紧接着伴随一波Full GC然后Memory Usage断崖式下跌这就是典型的资源未释放问题。2.2 G1 GC的“Humongous Object”陷阱如果你用的是G1 GC现在的标配要特别小心Humongous Allocation巨型对象分配。在Flink里如果你有个算子在做窗口聚合比如WindowFunction你用了一个ListString来攒数据。 正常流量下没事。但每隔30分钟可能有一个上游的业务窗口关闭吐出一个巨大的List。 只要这个对象超过了Region Size的50%G1就会把它当做巨型对象直接分配在老年代Old Gen而且往往需要连续的Region。如果内存碎片化严重G1为了找连续空间会触发Serial Old GC单线程Full GC慢到令人发指。排查指令去TaskManager的日志里grep这个grep -i Humongous taskmanager.log或者开启GC日志详情-Xlog:gc*info,gchumongousdebug:filegc.log:time,uptime,level,tags如果你在反压期间看到了大量的G1 Humongous Allocation日志或者To-space exhausted那你就要去查查你的代码了。是不是把什么巨大的HashMap或者List塞进了State里或者在处理函数里搞了个巨大的临时变量代码审查点检查是否有ListState被全量读取到内存Wait for it...iterable.iterator()然后转成了Java List。如果State里有几十万条数据这个动作每30分钟触发一次绝对能把JVM搞崩。2.3 那个被遗忘的“Local Cache”还有一个非常容易被忽视的杀手应用层本地缓存Guava Cache / Caffeine。开发者为了加速Lookup Join通常会在RichFunction里搞个Cache。CacheBuilder.newBuilder().expireAfterWrite(30, TimeUnit.MINUTES)...看到这行代码了吗30分钟过期。如果你的Cache很大比如存了GB级别的维表数据且设置了集中过期。每过30分钟Cache里的几百万个对象同时失效。 虽然失效本身不耗时但随后的GC周期需要扫描并标记这几百万个死对象。更糟糕的是缓存失效意味着缓存击穿。 接下来的几分钟里所有的请求都会穿透到外部存储MySQL/HBase。 外部存储扛不住突发的并发响应变慢。 Flink算子等待IO处理变慢。反压形成。验证方法Review代码搜索expire关键字。看外部存储如HBase的监控在反压期间QPS是不是突然暴涨Latnecy是不是飙升解决之道给过期时间加个Jitter抖动。 不要写死30分钟写成30分钟 Random(0-300秒)。让缓存失效的时间分散开别搞“大阅兵”。第三章不仅是Flink的事儿——外部生态的“蝴蝶效应”我们不能只盯着Flink的一亩三分地。Flink是流式计算的管道管道堵了很有可能是出口被封住了或者是由于某种神秘的“引力波”干扰。3.1 HDFS/S3 的“Block回收周期”这是我亲身经历过的一个坑。 曾经有个集群也是每隔一段时间反压。查了半天Flink没问题。 最后发现是HDFS的NameNode在做CheckPointImage合并或者DataNode在做Block Report。如果你的Flink作业Sink到HDFS比如StreamingFileSink且文件滚动策略设置得比较碎小文件多。 每隔一段时间通常是小时级别但也可能配置成30分钟HDFS DataNode会向NameNode汇报全量Block信息。 如果集群文件数过亿这个Report过程会消耗大量网络带宽和NameNode锁资源。 这时候Flink请求写Block会超时。 StreamingFileSink卡在flush()或close()方法上。 反压开始。怎么看不要只看Flink日志。去翻翻HDFS DataNode的日志看看有没有Block report相关的耗时记录。或者问问运维大哥那个时间点NameNode的RPC Queue Time是不是高了。3.2 Kafka Broker的“Rebalance”与“Log Compaction”如果你的Source是Kafka。 30分钟一次有没有可能是Consumer Group发生了Rebalance 虽然Rebalance通常很快但如果你的Topic Partition巨多几千个Consumer初始化慢Rebalance可能持续几分钟。 在这几分钟里Source停止消费下游没数据——这叫“断流”不叫反压 不。 如果部分Consumer上线了部分还没上或者Rebalance导致Partition分配倾斜某些Task负载激增就会导致局部的反压。更隐蔽的是Kafka服务端的Log Compaction或Segment Rolling。 如果在30分钟时Kafka Broker所在的磁盘正在疯狂做Log CleanupIO被打满。Flink写KafkaSink端的ACK延时增加。 Sink慢全链路慢。实锤证据在Flink Metrics里看KafkaProducer的request-latency-avg。 如果这根线和反压周期吻合别查Flink了去盘Kafka Broker。3.3 容器环境的“嘈杂邻居” (Noisy Neighbor)既然你说已经排除了业务高峰那我们就假设流量是平稳的。 但是物理机是平稳的吗现在的Flink大多跑在K8s或Yarn容器上。你的TaskManager可能和别人的ElasticSearch节点或者一个定时的ETL批处理任务跑在同一台物理机上。 每30分钟那个ETL任务启动狂吃网络带宽Network Throttling或Page Cache。 OS层面的资源争抢是残酷的。如果不做绑核CPU Pinning或严格的Cgroup隔离你的CPU时间片会被偷走。Top命令大法这需要一点运气和脚本。写个脚本在反压期间自动SSH到反压严重的TaskManager所在节点执行top和iotop。 看看除了java你的TaskManager之外还有没有哪个看起来面目可憎的进程在榜首赖着不走。 如果是Yarn环境看看NodeManager的日志是不是那个点有新的Container启动了第四章时间引发的血案——Timer Storm定时器风暴Flink是基于事件时间Event Time的这大家都知道。但很多人忽略了Flink内部对时间的驱动是依赖Timer定时器的。如果你的业务逻辑里用到了窗口Window或者在ProcessFunction里注册了定时器ctx.timerService().registerEventTimeTimer(...)那么请警惕“整点效应”。4.1 窗口触发的“拥堵路口”假设你有一个TumblingWindow滚动窗口窗口大小正好是30分钟。window(TumblingEventTimeWindows.of(Time.minutes(30)))虽然Flink的数据是流式处理的每来一条处理一条。但是窗口的计算和输出是批量的。 每过30分钟Watermark推过了窗口结束时间。这时候所有Key的窗口同时触发计算Trigger Fire。如果你的Key基数Cardinality很大比如有100万个Key。 在那一毫秒Flink的TimerService会尝试触发100万个onTimer回调。 这100万个回调虽然是在各个Slot里并行执行的但对于单个Slot来说它是串行的。现象还原平时CPU利用率20%。 一到30分钟整点或者Watermark推进到的那个逻辑时间点CPU瞬间打满User CPU高System CPU低。 反压从Sink端瞬间传导到Source。 持续几分钟处理完这批聚合后一切恢复平静。但这太简单了你应该早就排除了对吧那我们看个更复杂的Watermark的周期性滞后。4.2 Watermark生成器的“摸鱼”时刻如果你的Source是Kafka且分区很多比如100个Partition而你的并行度只有10。意味着每个Source Task要读取10个Partition。Flink的Watermark是取所有Partition中最小的那个。 如果有一个Partition因为某种原因比如上游写入该Partition的Producer挂了或者网络抖动断流了30分钟。 Flink的全局Watermark就会停在原地不动。憋大招模式开启Watermark卡住30分钟。但这30分钟里其他正常的99个Partition数据还在源源不断地进来。因为Watermark没动所有依赖Watermark触发的窗口、定时器全部憋在内存里无法触发也无法清理。State越来越大。突然那个断流的Partition恢复了或者因为idleTimeout机制触发Watermark瞬间向前跳跃了30分钟。灾难发生积压了30分钟的所有定时器在这一瞬间被同时唤醒。这就好比大坝泄洪。本来应该是涓涓细流的触发变成了一次毁灭性的洪峰。这5分钟的反压就是Flink在疯狂补作业。排查核武器去Flink Web UI找到Source算子点开Watermarks指标。 不要看平均值。看Min Watermark。 观察它是不是呈“台阶式”跳跃 正常应该是平滑上升的直线。如果它是一条水平线然后突然垂直上升那就是这个问题。代码里怎么修一定要配置withIdlenessStreamExecutionEnvironment env ... env.getConfig().setAutoWatermarkInterval(200); // 在WatermarkStrategy里加上这个 WatermarkStrategy .MyTypeforBoundedOutOfOrderness(Duration.ofSeconds(5)) .withIdleness(Duration.ofMinutes(1)); // 关键这个配置的意思是如果某个分区1分钟没数据就把它标记为Idle不再用它来拖累全局Watermark。第五章僵尸连接与网络防火墙的“30分钟魔咒”如果你的Flink作业需要写数据库MySQL, PostgreSQL, Oracle或者调用外部HTTP接口。这一章请反复阅读三遍。很多公司的网络架构里在应用层和数据库层之间横亘着一道防火墙Firewall或者负载均衡SLB/LVS。 这些网络设备有一个默认的安全策略TCP Idle Timeout空闲连接超时。这个值在很多设备上默认就是30分钟。5.1 沉默的杀手场景还原 你的Sink是一个JDBC Sink或者自定义的RichSinkFunction里面维护了一个连接池Connection Pool比如HikariCP或Druid。 作业运行过程中可能某些Sink Subtask分到的数据比较稀疏或者因为之前的某种过滤逻辑导致某个数据库连接在30分钟内没有数据传输。这时候防火墙静悄悄地把这个TCP连接在路由表中抹去了。 注意它不会发送RST包给客户端也不会发给服务端。它只是默默地“忘掉”了这个连接。30分钟零1秒新的数据来了。 Sink线程拿起这个连接试图发送一条Insert SQL。 因为TCP连接在本地看来还是ESTABLISHED状态数据发包成功写入Socket Buffer。 但是数据包到了防火墙那里防火墙说“你谁啊我不认识你。” 直接丢弃Drop不给任何回应。Sink线程陷入了SocketOutputStream.write()或者读取响应的read()中。 它在等。 等什么等操作系统的TCP重传机制或者JDBC驱动层的Socket Timeout。 这一等可能就是15分钟Linux默认TCP重传甚至更久。这就是为什么反压会持续几分钟的原因线程被IO Block住了。直到超时抛出异常连接池剔除坏连接重新建立新连接系统才恢复正常。怎么验证这很难通过Metrics看到因为线程是卡在Native方法的。 你需要用netstat -ano | grep ESTABLISHED或者ss命令配合抓包。 但最简单的验证方法是改配置。保命手段开启TCP KeepAlive在操作系统层面和JDBC URL参数里都开启。但TCP KeepAlive默认间隔通常是2小时远水解不了近渴。连接池心跳Validation Query 这是最管用的。在你的连接池配置里必须设置testWhileIdle truetimeBetweenEvictionRunsMillis 60000(1分钟检查一次)minEvictableIdleTimeMillis 300000(5分钟不用就回收)最关键的validationQuery SELECT 1(或者JDBC 4的isValid())强迫连接池每隔几分钟就发个心跳包哪怕没数据也要通过网络层告诉防火墙“大哥我还在别杀我。”真实案例我曾经排查过一个写入ElasticSearch的作业也是每30分钟卡死。最后发现是云厂商的NAT网关对Idle连接的超时限制正好是1800秒。我们在RestClient里加了KeepAlive回调问题瞬间消失。第六章剖析黑盒——当Stack Trace遇到Arthas如果以上都是推测那现在我要教你如何抓现行。 当反压发生的那5分钟里TaskManager内部到底发生了什么别猜我们要看证据。传统的jstack虽然有用但对于短暂的5分钟往往你登上去敲完命令现场已经没了。而且jstack只能看到快照看不到CPU时间片到底花哪儿了。这里推荐用阿里开源的Arthas或者Async-Profiler。但为了生产环境安全不需要安装Agent我们用最简单的Flame Graph火焰图思维。6.1 抓住那只“吞噬CPU的怪兽”假设反压又开始了。第一步找到最忙的那个TaskManager的PID。top -H -p PID找到占用CPU最高的那个线程ID假设是hex 0x1a2b。第二步采样。 如果你有权限用async-profiler是最好的./profiler.sh -d 30 -f flamegraph.html PID采集30秒的CPU Profile。第三步分析火焰图。 打开生成的HTML。 找那座“平顶山”Plateau。 火焰图的X轴是占用CPU的时间比例。如果你看到一段很宽的条上面写着java.util.HashMap.resize()或者com.yourcompany.utils.ComplexRegex.match()或者org.rocksdb.RocksDB.get()如果是HashMap resize说明你在处理函数里疯狂创建大Map或者频繁扩容每30分钟一波数据洪峰导致频繁扩容。如果是Regex说明某条特殊的数据触发了你的正则回溯死循环ReDoS。如果是RocksDB get说明前面提到的IO问题还是没解决或者你的Key设计导致了BloomFilter失效每次都要读盘。6.2 奇怪的锁竞争有时候CPU不高但就是处理慢。这时候要看锁。 在Arthas里输入thread -b(找出当前阻塞其他线程的线程)你可能会发现这样一个恐怖场景 所有的SourceReader线程都在WAITING (parking)。 而被谁Block住了呢 可能是一个静态单例Static Singleton的工具类里面有个synchronized方法。 或者是因为Log4j 1.x 在高并发写日志时的锁竞争哪怕是AsyncAppender如果Queue满了也会阻塞。一定要查日志框架很多故障是因为每30分钟业务触发某种边缘case打印大量Error日志。log.error(..., e)同步写文件磁盘IO变慢Log4j锁住所有业务线程。验证方法直接把日志级别调成OFF看看反压还在不在。如果在那就是日志系统背锅。第七章隐形倾斜——不是所有的热点都写在脸上提到数据倾斜Data Skew是个写Flink的都能背两句加盐Salt、两阶段聚合。 但那种倾斜通常是持续性的。某个Key一直热某个Subtask一直忙。你遇到的可是“30分钟一次”的周期性反压。这意味着这个热点Key是间歇性爆发的。7.1 “脉冲式”热点Pulse Skew场景还原 上游业务系统有个定时任务Cron Job每30分钟把一批特定的状态数据推送到Kafka。 这批数据有一个共同特征比如它们的User ID都是0或者SYSTEM_ADMIN或者某个字段是空的。 平时正常用户的流量像细水长流经过keyBy(userId)后均匀分布在各个Subtask上。 但每到30分钟整那几十万条userId0的数据瞬间涌入。根据Hash算法hash(0)的结果是固定的。 这意味着这几十万条数据会全部挤进同一个Subtask里。 其他的Subtask在围观唯独这个Subtask被活活撑死。怎么确诊别看总体的Backpressure指标。你要点开Flink Web UI的该算子详情查看Subtasks列表。 一定要在反压发生的那5分钟里看 如果发现Subtask 3InPoolUsage 100%Backpressure High。Subtask 0, 1, 2, 4...InPoolUsage 0%Backpressure OK。这就是典型的单点脉冲倾斜。解决这种“定时炸弹”的绝技普通的加盐给Key加随机后缀虽然能解决但在需要精准去重或排序的场景下会很麻烦。 我推荐一种更轻量的“动态采样旁路”策略Metric埋点在KeyBy后的处理函数里加个简单的Counter统计每个Key的QPS。日志打印if (counter threshold) log.warn(Hot Key Detected: {}, key);发现凶手后如果确认是userId0这种无效数据或者是系统数据直接在Source端或者KeyBy之前把它Filter掉或者Rebalance到单独的侧输出流SideOutput去慢慢处理别让一颗老鼠屎坏了一锅汤。7.2 那个被忽视的null值千万别信业务方说“这个字段必填”。 在ETL链路里Null值往往是倾斜之源。 如果每30分钟有一批脏数据进来Join Key全是Null。 Flink默认会把Null当做一个普通的Key来Hash。 结果就是所有Null值数据全去了同一个Subtask。防御性编程.filter(data - data.getKey() ! null) // 简单粗暴 // 或者 .keyBy(data - { if (data.getKey() null) { return UUID.randomUUID().toString(); // 把Null打散 } return data.getKey(); })这行代码能救你一命。第八章序列化的代价——POJO vs Kryo 的暗战如果你的CPU在反压期间飙高但Logic代码很简单也没发生GC。那你要看看你的对象是怎么在这个分布式系统里传输的。Flink的数据在算子之间传输Network Shuffle时必须序列化。 Flink有一套非常高效的POJO Serializer。但前提是你的Java Bean必须符合POJO规范有空构造函数、Getter/Setter等。 一旦你的对象里包含了一些奇怪的类型比如未经注册的第三方库对象或者Object类型Flink就会退化使用Kryo序列化。Kryo虽然全能但比POJO Serializer慢10倍不止而且极其消耗CPU。这跟30分钟有什么关系假设每30分钟涌入的那批数据是一个复杂的、嵌套很深的JSON转换过来的大对象。或者这批数据里包含了某种多态类型。 平时的小对象POJO搞得定。 30分钟时的那批大对象触发了Kryo的深层递归序列化。 CPU瞬间被序列化操作吃光来不及处理业务逻辑。反压形成。核查手段在你的main函数最前面加上这行代码env.getConfig().disableGenericTypes();这就好比给系统装了个“报警器”。如果Flink发现无法使用高效序列化器必须回退到Kryo时它会直接抛出异常让程序报错。别怕报错。报错是好事。 看到报错信息你就能知道是哪个Class没写好。 然后要么把它改成标准的POJO要么手动注册TypeInfo。 把这个隐患消灭在萌芽状态别让它在生产环境每30分钟折磨你一次。第九章微批化的艺术——LocalKeyBy 与 MiniBatch流计算是逐条处理的但这在某些高吞吐场景下是极大的浪费。 每来一条数据都要序列化、网络传输、反序列化、拿State锁、读State、写State、释放锁。如果每30分钟有一波高峰QPS翻了10倍。这种逐条处理的开销Overhead就成了瓶颈。9.1 手写 LocalKeyBy本地预聚合在数据进行keyBy网络Shuffle之前先在本地的Map里攒一攒。 比如在Source或者是Map算子后面接一个BundleOperator。逻辑如下定义一个本地MapMapKey, Accumulator。数据来了先不发往下游而是更新本地Map。关键点当Map大小达到阈值比如1000条或者时间过去了一小会儿比如100ms。将Map里的聚合结果统一发往下游。效果原本30分钟那一波峰值可能有100万条记录需要Shuffle 100万次。 经过预聚合只要Key的重复率高可能只需要Shuffle 1万次。 网络压力瞬间降低99%。代码骨架干货public class LocalAggregator extends RichFlatMapFunctionEvent, Event { private transient MapString, Long buffer; private final int flushSize 1000; Override public void flatMap(Event value, CollectorEvent out) { buffer.merge(value.getKey(), 1L, Long::sum); if (buffer.size() flushSize) { flush(out); } } // 别忘了在 close() 里 flush 剩下的数据 Override public void close() { flush(out); // 伪代码实际要把Collector存起来 } }当然Flink SQL里只需要开启table.exec.mini-batch.enabled参数配好它底层就是这么干的。但在DataStream API里很多时候得自己撸。第十章最后的杀手锏——自定义分区器Custom Partitioner如果你的下游Sink也是一个分布式系统比如写入HDFS的不同Bucket或者写入Kafka的不同Partition。 而30分钟那一波数据业务逻辑上决定了它们都要去往同一个下游分区。比如按照时间切分Bucket写入HDFS。30分钟整所有数据都要写入2023-10-27-10-30这个文件夹。 不管你在Flink里怎么并行到了Sink端本质上是对同一个文件系统的目录在进行高并发写入或者Rename操作。 文件系统的NameNode锁竞争或者单目录下的文件数限制会反向卡住Flink的Sink。破局思路不要直接用KeyBy。使用partitionCustom。dataStream.partitionCustom(new PartitionerString() { Override public int partition(String key, int numPartitions) { // 这里的逻辑你自己定 // 比如检测到热点Key就轮询发送给不同的Subtask // 或者结合负载情况动态分发 return (key.hashCode() Integer.MAX_VALUE) % numPartitions; } }, keyField);通过完全控制数据流向你可以绕过那些拥堵的“收费站”。甚至可以实现一个“感知负载的分区器”如果发现下游Subtask 3的处理延迟高了可以通过共享变量或外部存储反馈就把流量切给Subtask 4。这属于高阶玩法慎用但救命时很管用。