电子商务旅游网站建设策划书深圳建设网站过程
2026/4/18 4:11:37 网站建设 项目流程
电子商务旅游网站建设策划书,深圳建设网站过程,如何做国际网站产品宣传,如何申请免费企业邮箱1. TableEnvironment 是什么#xff0c;解决什么问题 Flink 的 Table/SQL 是声明式 API#xff0c;你写的 DDL、SQL、Table API 链式调用#xff0c;本质上都是在描述“要什么”#xff0c;由 Flink Planner 生成执行计划并提交作业执行。 TableEnvironment 的职责可以理解…1. TableEnvironment 是什么解决什么问题Flink 的 Table/SQL 是声明式 API你写的 DDL、SQL、Table API 链式调用本质上都是在描述“要什么”由 Flink Planner 生成执行计划并提交作业执行。TableEnvironment 的职责可以理解为三件事定义与管理元数据临时表/视图、Catalog 表/视图、函数、模块构建与解释计划sql_query/explain_sql、Table.explain、StatementSet.explain触发执行与作业配置execute_sqlDML、StatementSet.execute、Table.execute_insert配置并行度/容错等2. 创建 TableEnvironment 的两种常用方式2.1 推荐方式EnvironmentSettings 创建更纯粹的 Table/SQL 程序适用于主要用 Table API / SQL不强依赖 DataStream API。frompyflink.commonimportConfigurationfrompyflink.tableimportEnvironmentSettings,TableEnvironment configConfiguration()config.set_string(execution.buffer-timeout,1 min)env_settings(EnvironmentSettings.new_instance().in_streaming_mode().with_configuration(config).build())table_envTableEnvironment.create(env_settings)要点in_streaming_mode()常用于流作业你也可以按需要选择 batch 模式with_configuration(config)可以把一些执行参数提前塞进去例如 buffer-timeout2.2 与 DataStream 互操作StreamTableEnvironment需要 DataStream 能力时适用于你想在同一条链路里混用 DataStream 与 Table/SQL例如自定义 Source、复杂 ProcessFunction、Table 负责聚合与 Join。frompyflink.datastreamimportStreamExecutionEnvironmentfrompyflink.tableimportStreamTableEnvironment envStreamExecutionEnvironment.get_execution_environment()table_envStreamTableEnvironment.create(env)经验建议如果你不需要 DataStream 侧能力就用纯 TableEnvironment更简单如果你要用 DataStream 的 source/sink 或算子就用 StreamTableEnvironment3. Table/SQL 操作建表、视图、查询与执行最常用 API这一组 API 就是日常“写作业”的主流程。3.1 从 Python 数据构造 Tablefrom_elements / from_pandas快速构造测试数据、单元测试、demo 都很好用。from_elements(elements, schemaNone, verify_schemaTrue)from_pandas(pdf, schemaNone, split_num1)例子tabletable_env.from_elements([(1,Hi),(2,Hello)],[id,data])3.2 取出已注册对象from_pathfrom_path(path)用来把 Catalog/临时对象转成 Tablettable_env.from_path(sql_source)它也是替代旧 APIscan()的推荐写法。3.3 注册与管理视图create_temporary_view / create_viewcreate_temporary_view(view_path, table)临时视图会话级create_view(view_path, table, ignore_if_existsFalse)持久视图取决于 Catalog 是否持久化典型用法把 Table API 产物暴露给 SQL 使用。table_env.create_temporary_view(table_api_table,table)table_env.execute_sql(INSERT INTO sink SELECT * FROM table_api_table)3.4 注册与管理表create_temporary_table / create_table两类对象别混Temporary Table临时表常用于 source/sink作业脚本里即建即用Catalog Table持久化表通常配合 HiveCatalog、JDBC Catalog 等3.5 execute_sql 与 sql_query一个“执行”一个“拿 Table”execute_sql(stmt)执行单条语句DDL/DML/DQL/SHOW/DESCRIBE/EXPLAIN/USEsql_query(query)把 SQL 查询结果转成Table用于继续 Table API 链式处理示例ttable_env.sql_query(SELECT id, SUM(v) AS s FROM T GROUP BY id)关于 DML 的一个关键点execute_sql(INSERT INTO ...)通常是异步提交远端集群场景更符合预期本地 mini cluster / IDE 调试时经常需要.wait()等待结束4. 作业执行与解释计划explain_sql / StatementSet / 多 Sink4.1 explain_sql快速看 SQL 的 AST 与执行计划当你想确认优化有没有生效谓词下推、投影裁剪、Join 策略等优先用plantable_env.explain_sql(SELECT ...)print(plan)4.2 StatementSet一次作业写多个 Sink生产很常用你有多个下游例如同时写 Kafka、ES、Blackhole/Print Debug别拆多个作业用 StatementSetstatement_settable_env.create_statement_set()statement_set.add_insert_sql(INSERT INTO sink1 SELECT ...)statement_set.add_insert_sql(INSERT INTO sink2 SELECT ...)statement_set.execute().wait()优点复用同一份 source 与计算链路只提交一个 job运维更简单4.3 废弃 API 迁移建议少走弯路这些在新代码里建议不要再用旧 API推荐替代scanfrom_pathregister_tablecreate_temporary_viewsql_updateexecute_sqlexplain(table…)explain_sql / Table.explain / StatementSet.explainexecute(job_name)execute_sql / StatementSet.execute / Table.execute_insert5. UDF 管理Python/Java 函数注册与优先级TableEnvironment 可以直接注册函数也可以用 SQL 注册CREATE FUNCTION。常用接口create_temporary_function(path, function)临时 Catalog 函数create_temporary_system_function(name, function)临时 System 函数create_java_function(path, function_class_name, ...)注册 Java 函数到 Catalogcreate_java_temporary_function(...)/create_java_temporary_system_function(...)一个实用规则临时 system function如果和临时 catalog function同名system 的优先级更高更“全局”。删除相关drop_functiondrop_temporary_functiondrop_temporary_system_function6. Python 依赖管理让 UDF 在集群里也能找到你的包很多 PyFlink 作业跑在集群时失败不是逻辑错而是 UDF worker 找不到依赖。TableEnvironment 提供了三套常用方式6.1 add_python_file加单个文件/包/目录到 PYTHONPATHtable_env.add_python_file(/path/to/my_udfs.py)6.2 set_python_requirements用 requirements.txt 安装三方依赖table_env.set_python_requirements(requirements_file_path/path/requirements.txt,requirements_cache_dir/tmp/pyflink_cache)适合numpy/pandas/requests 等依赖要在 worker 端安装。6.3 add_python_archive分发压缩包并解压到 worker 工作目录table_env.add_python_archive(/path/my_dep.zip,target_dirdeps)适合你打包了模型文件、词典、配置等资源worker 端需要解压使用。7. 配置入口get_config并行度、作业名、执行语义都在这table_env.get_config()是你调参的总入口。最常见的两类设置7.1 常规配置并行度、作业名table_env.get_config().set(parallelism.default,8)table_env.get_config().set(pipeline.name,my_first_job)7.2 容错与状态StateBackend、Checkpoint、Restart Strategy现在推荐在 TableConfig 里配置而不是只在 StreamExecutionEnvironment 配。# 重启策略fixed-delaytable_env.get_config().set(restart-strategy.type,fixed-delay)table_env.get_config().set(restart-strategy.fixed-delay.attempts,3)table_env.get_config().set(restart-strategy.fixed-delay.delay,30s)# CheckpointExactly-oncetable_env.get_config().set(execution.checkpointing.mode,EXACTLY_ONCE)table_env.get_config().set(execution.checkpointing.interval,3min)# StateBackendrocksdb / hashmaptable_env.get_config().set(state.backend.type,rocksdb)table_env.get_config().set(execution.checkpointing.dir,file:///tmp/checkpoints/)经验建议开 RocksDB 必须配 checkpoint dir尤其是文件系统路径/对象存储路径流作业一定要把 checkpoint 周期、超时、并发数等补齐到生产标准这里只示例核心项8. Catalog 与 Module多环境、多库、多函数体系的关键当你接入 HiveCatalog、JDBC Catalog 或自研 Catalog 时以下 API 很重要Catalogregister_catalog / get_catalog / use_catalogDatabaseuse_database / get_current_database列举对象list_tables / list_views / list_functions / list_catalogs / list_databases ...Moduleload_module / unload_module / use_modules / list_modules典型场景你希望 SQL 里不写全限定名catalog.db.table就用use_catalog/use_database设置默认命名空间你希望扩展函数解析顺序就用 module 管理9. 快速工作流模板写 PyFlink Table 作业的标准姿势你可以把自己的项目按这个顺序组织清晰且可维护1创建 TableEnvironmentstreaming/batch config2设置 TableConfig并行度、checkpoint、statebackend、作业名3注册 catalog / use catalog / use database可选4注册 source/sinkDDL 或 TableDescriptor5注册 UDF 依赖如有6sql_query 拿 Table 或 Table API 构建链路7execute_sql INSERT / Table.execute_insert / StatementSet.execute 提交执行8必要时 explain_sql/table.explain 看计划

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询