2026/3/5 1:04:58
网站建设
项目流程
iis 网站制作,大都会app最新版本下载,wordpress文章支付可见,微信小程序快速赚50元Doris 与 Flink 整合实战#xff1a;构建实时计算分析平台
引言
痛点引入#xff1a;实时数据处理的“最后一公里”难题
在数字化时代#xff0c;实时性已经成为企业竞争的核心壁垒——电商需要实时监控订单波动#xff0c;物流需要实时追踪包裹位置#xff0c;游戏需要…Doris 与 Flink 整合实战构建实时计算分析平台引言痛点引入实时数据处理的“最后一公里”难题在数字化时代实时性已经成为企业竞争的核心壁垒——电商需要实时监控订单波动物流需要实时追踪包裹位置游戏需要实时统计玩家行为。但很多团队在搭建实时系统时都会遇到一个共性问题“计算很快但分析很慢”比如你用Flink实时处理了用户点击流计算出每5分钟的UV独立访客但结果存到了HBase或MySQL里。当业务人员想基于这些数据做实时Dashboard时却发现HBase不支持复杂SQL查询得写MapReduce才能统计MySQL面对千万级数据的聚合查询比如“按省份分组查UVTop10”延迟能高达几十秒若要做多维分析比如“同时按时间、地域、商品分类查询”要么得预计算大量中间表要么得接受“查一次等半分钟”的尴尬。这就是实时数据处理的“最后一公里”实时计算的结果无法被高效分析。而解决这个问题的关键在于找到一个能衔接“实时计算”与“实时分析”的桥梁。解决方案概述Flink Doris让数据“算完就能用”DorisApache Doris原百度Palo是一款MPP架构的实时分析型数据库主打“高吞吐写入、低延迟查询、复杂SQL支持”而Flink是业内公认的实时计算引擎天花板擅长处理流数据的低延迟计算。当两者结合时能形成一条端到端的实时数据链路数据接入从Kafka、Binlog、IoT设备等数据源获取实时流实时计算用Flink做清洗、转换、聚合比如计算UV、订单金额、用户留存实时写入通过Flink Doris Connector将计算结果实时写入Doris实时分析用Doris的SQL做多维分析、即席查询或对接Grafana/Tableau生成实时Dashboard。这个方案的核心优势是低延迟Flink的计算延迟在毫秒级Doris的查询延迟在毫秒到秒级高吞吐Doris支持每秒十万级别的写入Flink支持百万级别的并发处理易扩展两者都是分布式架构能通过增加节点线性扩展能力简化链路无需中间存储比如HBase/MySQL直接从计算到分析减少数据移动。最终效果展示我们将通过实战搭建一个电商实时UV分析系统最终实现Flink实时计算每5分钟的UV独立访客结果实时写入Doris用Doris SQL查询“近1小时各时间段的UV趋势”用Grafana展示实时Dashboard如下图。注此为模拟图实际效果需自行搭建准备工作在开始实战前需要完成环境搭建和基础知识储备。1. 环境与工具清单工具/组件版本要求说明Apache Flink1.17实时计算引擎推荐使用1.17及以上版本支持更完善的ConnectorApache Doris2.1实时分析数据库2.0版本支持更优的Stream Load性能Java1.8/11Flink和Doris均依赖JavaMaven/Gradle3.6构建Flink作业的依赖管理工具Kafka可选2.8模拟实际场景的数据源若没有Kafka可用Flink的DataGenSource替代Grafana可选9.0可视化Dashboard工具2. 依赖配置Flink作业要让Flink能写入Doris需在pom.xmlMaven或build.gradleGradle中添加Flink Doris Connector依赖Maven配置dependencies!-- Flink Table API 基础依赖 --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-api-java-bridge/artifactIdversion${flink.version}/versionscopeprovided/scope/dependency!-- Flink Doris Connector --dependencygroupIdorg.apache.doris/groupIdartifactIdflink-doris-connector-1.17/artifactIdversion1.4.0/version/dependency!-- 可选Kafka Connector若用Kafka做数据源 --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-kafka/artifactIdversion${flink.version}/version/dependency/dependenciesGradle配置dependencies{implementationorg.apache.flink:flink-table-api-java-bridge:${flink.version}implementationorg.apache.doris:flink-doris-connector-1.17:1.4.0implementationorg.apache.flink:flink-connector-kafka:${flink.version}}3. 基础知识储备为了顺利完成实战你需要了解以下概念Flink基础DataStream API/Table API、Watermark水位线、Checkpoint checkpoint、窗口函数Tumble Window/滑动窗口Doris基础表结构Duplicate Key/Unique Key/Aggregate Key、分区Partition、分桶Bucket、Stream Load流式写入协议实时计算概念流处理vs批处理、Exactly-Once精确一次语义。核心步骤从0到1搭建实时分析平台我们将按**“环境部署→数据准备→Flink计算→写入Doris→实时分析”**的流程逐步实现。步骤1快速部署Doris测试集群Doris支持多种部署方式二进制、Docker、K8s这里推荐Docker Compose快速搭建测试集群生产环境建议用二进制部署。1.1 安装Docker Compose确保你的机器已安装Docker和Docker Compose若未安装参考Docker官方文档。1.2 编写Docker Compose文件创建docker-compose.yml内容如下基于Doris 2.1版本version:3services:doris-fe:image:apache/doris:2.1.0-fe-x86_64container_name:doris-feports:-8030:8030# FE的HTTP端口用于Flink连接-9030:9030# FE的MySQL协议端口用于SQL客户端连接environment:-FE_SERVERSdoris-fe1:10.0.2.15:9010-FE_ID1volumes:-./doris-fe/data:/opt/apache-doris/fe/doris-meta-./doris-fe/log:/opt/apache-doris/fe/logdoris-be:image:apache/doris:2.1.0-be-x86_64container_name:doris-beports:-8040:8040# BE的HTTP端口depends_on:-doris-feenvironment:-BE_ADDRdoris-be1:10.0.2.16:9060-FE_SERVERSdoris-fe:10.0.2.15:9030volumes:-./doris-be/data:/opt/apache-doris/be/storage-./doris-be/log:/opt/apache-doris/be/log1.3 启动Doris集群在docker-compose.yml所在目录执行docker-composeup-d启动成功后可通过以下方式验证访问FE的Web UIhttp://localhost:8030默认用户名root密码为空用MySQL客户端连接FEmysql -h localhost -P 9030 -u root -p密码为空。步骤2准备实时数据源为了模拟真实场景我们用Flink的DataGenSource生成模拟的电商用户行为数据若你有Kafka集群也可以用Kafka作为数据源。2.1 定义用户行为数据结构用户行为数据包含以下字段字段名类型说明user_idBIGINT用户IDitem_idBIGINT商品IDcategory_idBIGINT商品分类IDbehaviorSTRING用户行为click/order/paytsTIMESTAMP(3)行为发生时间2.2 用Flink Table API创建数据源表在Flink SQL客户端或代码中创建user_behavior表用DataGenSource生成模拟数据CREATETABLEuser_behavior(user_idBIGINT,item_idBIGINT,category_idBIGINT,behavior STRING,tsTIMESTAMP(3),-- 定义Watermark处理乱序数据延迟5秒WATERMARKFORtsASts-INTERVAL5SECOND)WITH(connectordatagen,-- 用DataGenSource生成数据rows-per-second100,-- 每秒生成100条数据fields.user_id.min1,-- user_id的最小值fields.user_id.max10000,-- user_id的最大值fields.behavior.length1-- behavior字段的长度这里用枚举值后续过滤);步骤3Flink实时计算——计算每5分钟UVUV独立访客是电商最核心的实时指标之一我们需要计算每5分钟内的独立用户数。3.1 窗口函数选择UV计算需要按时间窗口聚合这里选择滚动窗口Tumble Window——每5分钟一个窗口窗口不重叠。3.2 编写Flink计算SQL用Flink Table API编写计算逻辑-- 计算每5分钟的UV仅统计click行为CREATEVIEWreal_time_uv_viewASSELECTTUMBLE_START(ts,INTERVAL5MINUTE)ASwindow_start,-- 窗口开始时间TUMBLE_END(ts,INTERVAL5MINUTE)ASwindow_end,-- 窗口结束时间COUNT(DISTINCTuser_id)ASuv-- 独立用户数FROMuser_behaviorWHEREbehaviorclick-- 仅统计点击行为GROUPBYTUMBLE(ts,INTERVAL5MINUTE);-- 按时间滚动窗口分组3.3 验证计算逻辑在Flink SQL客户端执行SELECT * FROM real_time_uv_view可以看到类似以下的结果window_startwindow_enduv2024-05-20 10:00:002024-05-20 10:05:001232024-05-20 10:05:002024-05-20 10:10:00456步骤4整合Flink与Doris——实时写入计算结果接下来我们需要将Flink计算出的UV结果实时写入Doris这是整个链路的核心环节。4.1 在Doris中创建目标表首先在Doris中创建用于存储UV结果的表real_time_uv。注意选择Duplicate Key表类型适合存储原始聚合结果支持更新按window_start窗口开始时间分区方便按时间范围查询按window_start分桶均匀分布数据。执行以下SQL通过MySQL客户端连接DorisCREATEDATABASEIFNOTEXISTStest;-- 创建测试数据库USEtest;CREATETABLEreal_time_uv(window_startDATETIME,-- 窗口开始时间window_endDATETIME,-- 窗口结束时间uvBIGINT-- 独立访客数)DUPLICATEKEY(window_start,window_end)-- 主键用于去重PARTITIONBYRANGE(window_start)(-- 按时间分区START(2024-01-01)END(2024-12-31)EVERY(INTERVAL1DAY))DISTRIBUTEDBYHASH(window_start)BUCKETS10-- 分桶10个桶PROPERTIES(replication_num1,-- 副本数测试环境设为1in_memorytrue-- 内存中存储加速查询);4.2 在Flink中配置Doris Sink在Flink中创建doris_sink表用于将计算结果写入Doris。关键配置参数说明参数说明connector固定为doris指定使用Flink Doris ConnectorfenodesDoris FE的HTTP地址格式FE_IP:FE_HTTP_PORT比如localhost:8030usernameDoris的用户名默认rootpasswordDoris的密码默认空table.identifierDoris的表标识符格式数据库名.表名比如test.real_time_uvsink.properties.format写入数据的格式支持json或csv这里用jsonsink.properties.strip_outer_array去除JSON数组的外层Flink输出的是数组Doris需要单条JSONsink.enable-delete是否支持删除可选默认falseFlink SQL配置CREATETABLEdoris_sink(window_startTIMESTAMP(3),window_endTIMESTAMP(3),uvBIGINT,PRIMARYKEY(window_start,window_end)NOTENFORCED-- 主键与Doris表对齐)WITH(connectordoris,fenodeslocalhost:8030,usernameroot,password,table.identifiertest.real_time_uv,sink.properties.formatjson,sink.properties.strip_outer_arraytrue,sink.parallelism2-- 写入Doris的并行度根据集群规模调整);4.3 启动Flink写入任务执行INSERT INTO语句将real_time_uv_view的结果写入doris_sinkINSERTINTOdoris_sinkSELECT*FROMreal_time_uv_view;4.4 验证数据写入在Doris中执行查询确认数据已写入SELECTwindow_start,window_end,uvFROMtest.real_time_uvORDERBYwindow_startDESCLIMIT10;若看到类似以下结果说明写入成功window_startwindow_enduv2024-05-20 10:30:002024-05-20 10:35:007892024-05-20 10:25:002024-05-20 10:30:006542024-05-20 10:20:002024-05-20 10:25:00321步骤5Doris实时分析——从数据到价值Doris的核心优势是复杂SQL的低延迟查询。我们可以基于real_time_uv表做多种实时分析。5.1 实时查询近1小时的UV趋势查询最近1小时内每5分钟的UV变化SELECTwindow_start,uvFROMtest.real_time_uvWHEREwindow_startNOW()-INTERVAL1HOUR-- 过滤最近1小时的数据ORDERBYwindow_startASC;5.2 多维分析按天统计UV总和若要统计每天的总UV只需在window_start上做日期截断SELECTDATE(window_start)ASdt,-- 截断到日期SUM(uv)AStotal_uv-- 求和FROMtest.real_time_uvGROUPBYdtORDERBYdtDESC;5.3 实时Dashboard用Grafana可视化为了让业务人员更直观地看到数据我们用Grafana对接Doris生成实时Dashboard。5.3.1 安装Grafana Doris插件Grafana社区提供了Doris的数据源插件grafana-doris-datasource安装步骤下载插件git clone https://github.com/apache/doris/tree/master/grafana-datasource将插件复制到Grafana的插件目录比如/var/lib/grafana/plugins重启Grafanasudo systemctl restart grafana-server。5.3.2 配置Doris数据源在Grafana中添加数据源点击左侧菜单栏的“Configuration”→“Data Sources”点击“Add data source”搜索“Doris”配置参数Name数据源名称比如DorisFENodesDoris FE的HTTP地址比如http://localhost:8030UsernameDoris用户名rootPasswordDoris密码空点击“Save Test”验证连接成功。5.3.3 制作实时UV Dashboard点击左侧菜单栏的“Create”→“Dashboard”点击“Add panel”选择“Table”或“Time series”图表编写SQL查询比如“近1小时的UV趋势”SELECTwindow_startAStime,-- Grafana需要time字段作为时间轴uvASUVFROMtest.real_time_uvWHEREwindow_start$__timeFrom()ANDwindow_start$__timeTo()-- 时间范围变量ORDERBYwindow_startASC;调整图表样式比如选择“Line”类型设置刷新间隔为“5s”保存Dashboard完成原理深入Flink Doris的“实时”是如何实现的为了让你不仅“会用”更“懂原理”我们深入讲解Flink Doris Connector的工作机制和Exactly-Once语义的实现。1. Flink Doris Connector的工作原理Flink Doris Connector基于Doris的Stream Load协议实现数据写入。Stream Load是Doris的高吞吐写入接口支持每秒十万级别的数据写入且延迟在毫秒级。Connector的工作流程数据分片Flink将数据分成多个分片由sink.parallelism控制并行度数据缓存每个分片将数据缓存到内存中默认缓存大小为1MB可通过sink.batch.size调整批量写入当缓存达到阈值或超时sink.batch.interval时将数据打包成JSON数组通过Stream Load协议发送到Doris结果确认Doris返回写入结果Flink根据结果确认是否提交Checkpoint保证Exactly-Once。2. Exactly-Once语义的实现Exactly-Once是实时系统的关键需求——数据不丢不重。Flink Doris的Exactly-Once通过以下机制实现Flink CheckpointFlink定期生成Checkpoint记录当前的处理位置Doris事务Doris的Stream Load支持事务通过label参数唯一标识一个事务两阶段提交2PC准备阶段Flink将数据写入Doris的临时目录不提交事务提交阶段Flink完成Checkpoint后通知Doris提交事务回滚阶段若Flink任务失败从最近的Checkpoint恢复重新写入数据Doris会自动忽略重复的label。要开启Exactly-Once需要在Flink中配置CheckpointStreamExecutionEnvironmentenvStreamExecutionEnvironment.getExecutionEnvironment();env.enableCheckpointing(60000);// 每60秒做一次Checkpointenv.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);// 开启Exactly-Once3. 性能优化技巧为了让系统达到最佳性能你可以调整以下参数参数优化方向sink.batch.size增大缓存大小比如从1MB调整到10MB减少Stream Load的次数降低延迟sink.batch.interval增大超时时间比如从1s调整到5s合并更多数据写入sink.parallelism增加并行度比如从2调整到10提高写入吞吐doris.be.number增加Doris BE的数量生产环境建议至少3个提高查询性能doris.table.buckets调整分桶数比如从10调整到20均匀分布数据总结与扩展回顾要点通过本文的实战你已经掌握了Flink Doris整合的核心流程部署环境用Docker快速启动Doris集群配置Flink依赖准备数据用DataGenSource生成模拟用户行为数据实时计算用Flink的窗口函数计算每5分钟的UV写入Doris配置Flink Doris Connector将结果实时写入Doris实时分析用Doris SQL做多维查询用Grafana可视化。常见问题FAQQ1Flink任务启动失败提示“无法连接到Doris FE”检查fenodes参数是否正确比如localhost:8030注意不是MySQL端口9030检查Doris FE是否在运行docker ps看doris-fe容器状态检查防火墙是否开放了8030端口sudo ufw allow 8030。Q2写入Doris的数据查询不到检查Flink任务是否在运行flink list看任务状态检查Doris的分区是否正确比如window_start是否在分区范围内检查Flink的watermark设置是否合理比如延迟5秒是否数据还没到窗口结束时间。Q3查询延迟高怎么办调整Doris表的in_memory参数为true内存中存储增加Doris BE的数量提高查询并行度为高频查询创建Rollup表预聚合加速查询CREATEROLLUP real_time_uv_rollup(window_start,uv)FROMreal_time_uvKEY(window_start)DISTRIBUTEDBYHASH(window_start)BUCKETS10;下一步构建更完整的实时数据平台本文的实战是一个基础的实时分析系统你可以在此基础上扩展更复杂的场景对接Kafka用Kafka作为数据源更贴近实际生产多流Join用Flink做双流Join比如用户行为流与商品信息流Join实时数仓结合Hudi/Deltalake构建实时数仓Doris作为数仓的分析层告警系统用Flink的CEP复杂事件处理检测异常数据触发告警比如UV骤降。结语Flink Doris的整合解决了实时数据处理的“最后一公里”问题——让计算结果能被快速分析。无论是电商的实时监控、物流的实时追踪还是游戏的实时统计这个组合都能胜任。通过本文的实战你已经掌握了搭建实时分析平台的核心能力。接下来不妨尝试将其应用到你的实际项目中让数据真正“活”起来如果在实践中遇到问题欢迎在评论区交流或参考以下资源Doris官方文档https://doris.apache.org/Flink官方文档https://flink.apache.org/Flink Doris Connector文档https://doris.apache.org/docs/dev/ecosystem/flink-doris-connector/最后祝愿你在实时数据的世界里越走越远