怎么租域名做网站公司网站建设需要收集什么信息
2026/3/31 9:44:26 网站建设 项目流程
怎么租域名做网站,公司网站建设需要收集什么信息,wordpress 图片下加文字,ui界面设计英文大数据工程中的自动化数据质量检查 从 0 到 1 用 Apache Spark Great Expectations Airflow 打造可扩展、可复用的数据质量平台 目标读者与前置知识 目标读者需要具备的前置知识1#xff5e;3 年经验的大数据开发 / 数据平台工程师熟悉 Linux 命令行、Python 语法、SQL 基础…大数据工程中的自动化数据质量检查从 0 到 1 用 Apache Spark Great Expectations Airflow 打造可扩展、可复用的数据质量平台目标读者与前置知识目标读者需要具备的前置知识13 年经验的大数据开发 / 数据平台工程师熟悉 Linux 命令行、Python 语法、SQL 基础对数据治理、数据质量有痛点的产品或数仓同学了解 Hadoop 生态HDFS/Hive或云数据湖S3、OSS概念想将「人工对数」升级为「自动化质检」的团队能跑通最简单的 Spark job知道 DAG 调度是什么文章目录引言为什么“数据质量”总在救火问题背景与动机当数据量 人力肉眼极限核心概念与理论基础3.1 数据质量的 5 大维度3.2 自动化质检在数据管道的位置3.3 Great Expectations 架构速览环境准备10 分钟可复现的 Docker-Compose 栈分步实现5.1 步骤 1初始化 Great Expectations 项目5.2 步骤 2连接 Spark Hive Catalog5.3 步骤 3编写第一批 Expectations字段非空、主键唯一、分布漂移5.4 步骤 4把质检任务封装成 Airflow DAG5.5 步骤 5失败告警与企业微信 / Slack 集成5.6 步骤 6多表批量生成 Expectations模板化5.7 步骤 7数据质量分数可视化Superset 仪表盘关键代码解析与深度剖析6.1 Checkpoint 运行时序6.2 自定义 Expectation业务规则 DSL6.3 千万级表 Row-Level 异常采样策略结果展示与验证7.1 运行日志与失败报告示例7.2 性能基准1 TB 表 200 Expectations 耗时 8 min性能优化与最佳实践8.1 列式统计复用 增量检查8.2 并行度与分区裁剪8.3 元数据缓存与 Iceberg/Hudi 快照读常见问题与解决方案FAQ未来展望从“事后检查”到“事前防控”总结参考资料附录完整 docker-compose.yml GitHub 仓库1. 引言为什么“数据质量”总在救火“老板早上看到报表数字对不上第一反应永远是‘数据是不是又脏了’”在传统数仓时代数据量小、表少数据开发同学可以靠“肉眼 SQL 对数”——写几条COUNT(*)、GROUP BY就定位问题。进入大数据时代后表 5000 张字段 10 万列离线 实时双链路同一个指标可能来自 3 个不同作业业务变更频繁字段增减、枚举值变化无通知人肉对数彻底失效数据质量问题从“偶发”变成“常态”。本文要解决的正是如何把事后救火变成事前拦截把不可复现的人工检查变成可配置、可调度、可量化的自动化流程。2. 问题背景与动机当数据量 人力肉眼极限2.1 现有方案痛点方案优点痛点SQL 脚本 Cron简单无统一报告、无历史趋势、无权限隔离自研 Python 脚本灵活重复开发、无 UI、无版本管理商业 DQ 工具Informatica、Talend功能全贵、封闭、难二次开发数据湖格式自带统计Iceberg Metrics轻量仅文件级缺业务语义规则2.2 技术选型理由开源、可扩展、与 Spark 生态无缝集成、社区活跃 →Great ExpectationsGX统一调度、失败重试、告警 →Apache Airflow计算引擎 →Spark 3.4已支持 Python 3.11 ANSI SQL Mode可视化 →Superset原生支持 GX 元数据 SQL3. 核心概念与理论基础3.1 数据质量的 5 大维度Completeness完整性例order_id非空比例 ≥ 99.9%Uniqueness唯一性例主键user_iddt无重复Validity有效性例枚举值status ∈ [paid,cancelled]Consistency一致性例订单表amount汇总 支付表sum(money)误差 0.1%Timeliness时效性例业务 2 点前上传质检 3 点前完成超时即报警3.2 自动化质检在数据管道的位置┌──────────┐ ┌──────────┐ ┌──────────┐ | Ingest |--| Clean |--| Aggregate| | (Flume) | | (Spark) | | (Spark) | └──────────┘ └──────────┘ └──────────┘ ↓ ↓ ↓ Raw Zone Staging Zone Curated Zone ↓ ↓ ↓ ┌────────────────────────────────────────────┐ | 质检 Checkpoint | | 失败 → 发送告警 阻塞下游 | | 成功 → 写 data_quality_pass 标记 | └────────────────────────────────────────────┘经验质检任务必须阻塞下游否则“脏数据”一旦扩散修复成本指数级上升。3.3 Great Expectations 架构速览DataContext ├── Datasources (Spark, Pandas, SQL) ├── Expectations Suites (JSON) ├── Profilers (自动生成规则) ├── Checkpoints (运行时配置) ├── Validation Results (JSON → 可转存 S3) └── Data Docs (静态 HTML 报告)4. 环境准备10 分钟可复现的 Docker-Compose 栈全部镜像使用官方社区版Mac M1 / Linux x86 均测通。目录结构gx-spark-airflow/ ├── docker-compose.yml ├── spark-defaults.conf ├── great_expectations/ │ ├── great_expectations.yml │ └── expectations/ ├── dags/ │ └── dq_checkpoint_dag.py └── superset/ └── superset_config.py4.1 一键启动gitclone https://github.com/yourname/gx-spark-airflow.gitcdgx-spark-airflowdockercompose up -d# 默认拉起 7 个服务服务端口说明Airflow Web8080admin/adminSpark Master7077Spark History18080Jupyter Lab8888token:abcdSuperset8088admin/adminPostgres5432Airflow GX 元数据库MinIO9000S3 API存 Validation 结果4.2 软件版本锁定组件版本Spark3.4.1Hadoop 3Great Expectations0.17.9Airflow2.7.0Python3.11-slim5. 分步实现5.1 步骤 1初始化 Great Expectations 项目进入 Jupyter 容器dockerexec-it jupyterbash# 安装 GX CLIpipinstallgreat_expectations0.17.9 great_expectations init交互式向导选spark作为计算引擎s3作为元数据与结果存储endpoint 填http://minio:9000access_keyminioadmin完成后目录great_expectations/ ├── great_expectations.yml ├── expectations/ ├── checkpoints/ ├── plugins/ └── uncommitted/5.2 步骤 2连接 Spark Hive Catalog编辑great_expectations.yml追加datasources:spark_datasource:class_name:Datasourceexecution_engine:class_name:SparkDFExecutionEnginespark_config:spark.sql.catalogImplementation:hivehive.metastore.uris:thrift://hive-metastore:9083data_connectors:default_inferred_data_connector_name:class_name:InferredAssetSqlDataConnectorinclude_schema_name:true测试连通importgreat_expectationsasgx contextgx.get_context()context.test_yaml_config(yaml.dump(datasource_config))5.3 步骤 3编写第一批 Expectations以dwd_order表为例需求规则期望非空order_id,user_id,amount唯一order_id范围amount∈ [0.01, 50000]枚举status∈ [‘paid’,‘cancelled’,‘refund’]代码fromgreat_expectations.checkpointimportSimpleCheckpointfromgreat_expectations.core.batchimportBatchRequest batch_requestBatchRequest(datasource_namespark_datasource,data_connector_namedefault_inferred_data_connector_name,data_asset_namedwd_order,batch_spec_passthrough{reader_method:table},)suitecontext.create_expectation_suite(expectation_suite_namedwd_order_suite,overwrite_existingTrue)# 非空suite.add_expectation(gx.core.ExpectationConfiguration(expectation_typeexpect_column_values_to_not_be_null,kwargs{column:order_id},))# 唯一suite.add_expectation(gx.core.ExpectationConfiguration(expectation_typeexpect_column_values_to_be_unique,kwargs{column:order_id},))# 数值范围suite.add_expectation(gx.core.ExpectationConfiguration(expectation_typeexpect_column_values_to_be_between,kwargs{column:amount,min_value:0.01,max_value:50000},))# 枚举suite.add_expectation(gx.core.ExpectationConfiguration(expectation_typeexpect_column_values_to_be_in_set,kwargs{column:status,value_set:[paid,cancelled,refund]},))context.save_expectation_suite(suite)运行 CheckpointcheckpointSimpleCheckpoint(namedwd_order_checkpoint,data_contextcontext,batch_requestbatch_request,expectation_suite_namedwd_order_suite,action_list[{name:store_validation_result,action:{class_name:StoreValidationResultAction},},{name:update_data_docs,action:{class_name:UpdateDataDocsAction},},],)checkpoint_resultcheckpoint.run()成功会输出success: true, statistics: { evaluated_expectations: 4, successful_expectations: 4, success_percent: 100 }5.4 步骤 4把质检任务封装成 Airflow DAGdags/dq_checkpoint_dag.pyfromairflowimportDAGfromairflow.operators.bashimportBashOperatorfromdatetimeimportdatetime,timedelta default_args{owner:data_team,depends_on_past:False,retries:2,retry_delay:timedelta(minutes5),}withDAG(dag_iddq_dwd_order,default_argsdefault_args,start_datedatetime(2023,10,1),schedule_interval0 2 * * *,# 每天 02:00 跑catchupFalse,)asdag:# 使用 GX CLI 运行 checkpointdq_taskBashOperator(task_idrun_gx_checkpoint,bash_commandcd /opt/great_expectations great_expectations checkpoint run dwd_order_checkpoint,)把great_expectations目录挂载到 Airflow 容器volumes:-./great_expectations:/opt/great_expectations重启 AirflowWeb 里开启 DAG手动 trigger 一次观察日志。5.5 步骤 5失败告警与企业微信 / Slack 集成在great_expectations.yml末尾加validation_operators:action_list_operator:class_name:ActionListValidationOperatoraction_list:-name:send_wechataction:class_name:custom_plugins.WeChatWebhookActionwebhook_url:https://qyapi.weixin.qq.com/cgi-bin/webhook/send?keyxxx自定义插件plugins/wechat_action.py核心片段fromgreat_expectations.validation_operatorsimportValidationActionimportrequestsclassWeChatWebhookAction(ValidationAction):def_run(self,validation_result_suite,validation_result_suite_identifier,**kwargs):successvalidation_result_suite.success run_idvalidation_result_suite.meta[run_id]ifnotsuccess:msgf❌ 数据质量检查失败run_id{run_id}requests.post(self.webhook_url,json{msgtype:text,text:{content:msg}})5.6 步骤 6多表批量生成 Expectations模板化公司有 100 张业务表手工写 JSON 不现实。GX 提供OnboardingDataAssistantfromgreat_expectations.datasource.fluent.interfacesimportDatasourcefromgreat_expectations.core.batchimportBatchRequestfromgreat_expectations.rule_based_profilerimportRuleBasedProfiler batch_requestBatchRequest(datasource_namespark_datasource,data_asset_namedwd_order,)profilerRuleBasedProfiler(nameonboarding,config_version1.0,variables{},rules{},)suiteprofiler.run(batch_requestbatch_request)context.save_expectation_suite(suite)经验自动生成后务必人工 review再上线避免“过度期望”导致误报。5.7 步骤 7数据质量分数可视化Superset 仪表盘GX 每次运行会把结果写进 Postgres 表ge_validations.Superset 里新建 SQL 数据集SELECTexpectation_suite_name,success::int*100.0/evaluated_expectationsASsuccess_rate,run_timeFROMge_validationsWHEREcreated_atcurrent_date-7用 Line Chart 展示“最近 7 天质量分数趋势”挂到仪表盘首页让老板一眼看到“数据健康度”。6. 关键代码解析与深度剖析6.1 Checkpoint 运行时序Spark 读取表 → DataFrameGX 把 Expectations 翻译成 Catalyst 逻辑计划下推列统计如max(amount)结果写回validation_resultsJSONData Docs 生成静态 HTML推送 MinIO生命周期 90 天自动回收6.2 自定义 Expectation业务规则 DSL例订单金额小数位 ≤ 2fromgreat_expectations.coreimportExpectationConfigurationfromgreat_expectations.expectations.custom_expectationimportCustomExpectationfrompyspark.sql.functionsimportregexp_extractclassExpectColumnValuesDecimalPlaces(CustomExpectation):metric_dependencies(column_values.decimal_places,)success_keys(max_places,)defvalidate_configuration(self,configuration:ExpectationConfiguration):assertcolumninconfiguration.kwargsassertmax_placesinconfiguration.kwargsdef_validate(self,metrics):placesmetrics[column_values.decimal_places]max_placesself.configuration.kwargs[max_places]returnplacesmax_places注册到plugins/后即可在 Suite 里使用suite.add_expectation(ExpectationConfiguration(expectation_typeexpect_column_values_decimal_places,kwargs{column:amount,max_places:2},))6.3 千万级表 Row-Level 异常采样策略若一张表 10 亿行全量扫描成本不可接受。GX 支持mostly参数99 % 通过即可sample子句只扫 100 万行partition_by按 dt 分区只扫最近 3 天示例batch_requestBatchRequest(datasource_namespark_datasource,data_asset_namedwd_order,batch_spec_passthrough{reader_options:{predicates:[dt 2023-10-01,dt 2023-10-03]}},)7. 结果展示与验证7.1 运行日志与失败报告示例失败场景枚举值新增pending导致expect_column_values_to_be_in_set不通过。GX Data Docs 自动生成列失败值计数占比statuspending12,3450.8 %7.2 性能基准表大小行数Expectations耗时集群规格1 TB5.2 B2008 min20 × m5.xlarge (8 vCPU, 32 GB)优化手段列统计复用一次agg计算多个期望增量检查Icebergsnapshot-id对比仅扫新增文件动态并发spark.sql.shuffle.partitions min(400, 表大小/128 MB)8. 性能优化与最佳实践Completeness vs. Timeliness 权衡若表 10 亿行允许采样 1 %可把 30 min 降到 3 min业务可接受即可。把“业务规则”与“技术规则”分层技术规则非空、唯一统一模板业务规则金额小数位放 Git 代码Code Review 强制合并。失败分级BLOCKER主键重复 → 阻塞下游WARN枚举值新增 → 只报警不阻塞用 GXaction_list的validation_operator区分处理。元数据与数据分离Validation 结果写对象存储不占用 Hive 空间生命周期 90 天自动删。版本管理Expectation Suite 放 Git每次修改走 MR回滚只需git revert.9. 常见问题与解决方案FAQ问题根因解决Great Expectations找不到 Hive 表metastore 地址未配置spark.sql.catalogImplementationhiveSpark OOM采样比例过高调低mostly或增加sampleAirflow 报great_expectations not found镜像缺包在requirements-airflow.txt加great_expectations0.17.9MinIO 证书错误用 HTTP 自签加环境变量AWS_CA_BUNDLE中文列名失败GX 默认正则[a-zA-Z0-9_]改expect_column_names_to_match_regex或重命名列10. 未来展望从“事后检查”到“事前防控”Schema Registry Kafka在数据接入时就对字段类型、枚举值做校验不让脏数据进湖。ML 异常检测用 Facebook Prophet 或 AWS Deequ 的ApproxQuantiles自动发现趋势突变。Data Contract上游团队签署“数据契约”违反即回滚版本实现“左移”理念。DataOps 平台化把 GX、Airflow、Superset、Catalog 集成到统一门户一键订阅质检报告。11. 总结我们完整走过了“环境搭建 → 单表质检 → 调度集成 → 可视化 → 性能优化”全链路。你现在可以用 GX Spark 在分钟级完成 TB 级数据质量检查通过 Airflow 把质检嵌入日常管道失败即阻塞、自动告警用 Superset 让业务方实时看到数据健康度不再“拍脑袋”决策数据质量不再是“事后救火”而是可度量、可复盘、可改进的工程化环节。希望本文能帮你把“数据质量”从运维痛点转变为团队信任基石。12. 参考资料Great Expectations 官方文档https://docs.greatexpectations.io/Spark 3.4 Performance Tuning Guidehttps://spark.apache.org/docs/latest/sql-performance-tuning.htmlAWS Deequ Paper《Automating Large-Scale Data Quality Verification》Airbnb Data Quality with Airflowhttps://medium.com/airbnb-engineering/dq-airbnb-4f8d8e7f4b7c《Data Quality Fundamentals》O’Reilly 202213. 附录A. 完整 docker-compose.yml节选version:3.8services:spark-master:image:apache/spark:3.4.1command:/opt/spark/sbin/start-master.shports:-7077:7077-8080:8080jupyter:image:jupyter/pyspark-notebook:python-3.11volumes:-./great_expectations:/home/jovyan/gxenvironment:SPARK_MASTER_URL:spark://spark-master:7077airflow:image:apache/airflow:2.7.0-python3.11volumes:-./dags:/opt/airflow/dags-./great_expectations:/opt/great_expectationsenvironment:AIRFLOW__CORE__EXECUTOR:LocalExecutorB. GitHub 仓库含全部代码 SQLhttps://github.com/yourname/gx-spark-airflow记得顺手点个 ⭐方便后续更新“数据质量不是一次项目而是持续运营。”——祝你早日让团队告别“数据背锅”用工程化手段把风险消灭在源头

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询