跳转到内容
主菜单
主菜单
移至侧栏
隐藏
导航
首页
最近更改
随机页面
MediaWiki帮助
代码酷
搜索
搜索
中文(中国大陆)
外观
创建账号
登录
个人工具
创建账号
登录
未登录编辑者的页面
了解详情
贡献
讨论
编辑“︁
Airflow子DAG
”︁
页面
讨论
大陆简体
阅读
编辑
编辑源代码
查看历史
工具
工具
移至侧栏
隐藏
操作
阅读
编辑
编辑源代码
查看历史
常规
链入页面
相关更改
特殊页面
页面信息
外观
移至侧栏
隐藏
您的更改会在有权核准的用户核准后向读者展示。
警告:
您没有登录。如果您进行任何编辑,您的IP地址会公开展示。如果您
登录
或
创建账号
,您的编辑会以您的用户名署名,此外还有其他益处。
反垃圾检查。
不要
加入这个!
= Airflow子DAG = == 介绍 == '''子DAG(SubDAG)'''是Apache Airflow中用于模块化和管理复杂工作流的一种机制。它允许用户将一组相关的任务封装为一个独立的DAG,并在父DAG中以单个任务的形式调用。子DAG的主要目的是提高代码的可重用性和可维护性,同时避免父DAG变得过于庞大和难以管理。 子DAG的核心特点包括: * '''封装性''':将多个任务组合为一个逻辑单元。 * '''复用性''':可在多个父DAG中重复使用相同的子DAG。 * '''层级结构''':支持嵌套,子DAG内部可以再包含其他子DAG。 == 为什么使用子DAG? == 在以下场景中,子DAG特别有用: 1. '''任务分组''':将逻辑相关的任务(如数据预处理步骤)封装为一个子DAG。 2. '''减少视觉混乱''':简化父DAG的图形化表示,使其更易读。 3. '''并行执行''':通过子DAG的并发控制,优化任务调度。 == 子DAG的基本用法 == === 定义子DAG === 子DAG是一个普通的DAG,但需要通过`SubDagOperator`在父DAG中调用。以下是一个简单的子DAG定义示例: <syntaxhighlight lang="python"> from airflow import DAG from airflow.operators.dummy import DummyOperator from airflow.operators.subdag import SubDagOperator from datetime import datetime def create_subdag(parent_dag_name, child_dag_name, args): """ 定义一个子DAG的函数 """ subdag = DAG( dag_id=f"{parent_dag_name}.{child_dag_name}", default_args=args, schedule_interval="@daily", ) with subdag: start = DummyOperator(task_id="start") end = DummyOperator(task_id="end") start >> end return subdag # 父DAG定义 with DAG("parent_dag", start_date=datetime(2023, 1, 1)) as dag: subdag_task = SubDagOperator( task_id="run_subdag", subdag=create_subdag("parent_dag", "child_dag", dag.default_args), ) </syntaxhighlight> === 关键参数说明 === * '''`dag_id`''':子DAG的ID必须采用`父DAG名.子DAG名`的格式。 * '''`default_args`''':子DAG继承父DAG的参数(如`start_date`、`retries`)。 * '''`schedule_interval`''':子DAG的调度间隔通常与父DAG一致。 == 子DAG的注意事项 == === 并发与性能 === * 子DAG会占用一个完整的调度器槽(slot),可能导致资源浪费。 * Airflow 2.0+推荐使用'''任务组(TaskGroup)'''替代子DAG,因为后者存在性能瓶颈。 === 错误处理 === * 子DAG中的任务失败会标记整个子DAG任务为失败。 * 父DAG可以通过`trigger_rule`参数控制依赖行为。 == 实际案例:ETL流水线 == 以下是一个ETL(提取、转换、加载)流水线的子DAG实现示例,其中子DAG负责数据清洗: <syntaxhighlight lang="python"> def create_cleaning_subdag(parent_dag_name, child_dag_name, args): subdag = DAG( f"{parent_dag_name}.{child_dag_name}", default_args=args, ) with subdag: extract = DummyOperator(task_id="extract_raw_data") clean_data = PythonOperator( task_id="clean_data", python_callable=lambda: print("Running data cleaning..."), ) load_metadata = DummyOperator(task_id="update_metadata") extract >> clean_data >> load_metadata return subdag </syntaxhighlight> == 子DAG vs 任务组 == <mermaid> graph TD A[父DAG] -->|SubDagOperator| B(子DAG) A -->|TaskGroup| C[内联任务组] B --> D[独立DAG文件] C --> E[无需额外DAG] </mermaid> | 特性 | 子DAG | 任务组 (TaskGroup) | |--------------------|------------------------|--------------------------| | **资源占用** | 高(独立槽位) | 低(共享槽位) | | **适用版本** | Airflow 1.x/2.x | Airflow 2.0+ | | **代码位置** | 需单独文件 | 可在同一文件 | | **调试难度** | 较高(跨DAG) | 较低(内联) | == 数学表达:子DAG调度 == 子DAG的调度时间需与父DAG对齐。设父DAG的调度间隔为<math>T_p</math>,子DAG为<math>T_c</math>,则需满足: <math>T_c = k \cdot T_p \quad (k \in \mathbb{N})</math> == 总结 == * 子DAG适合在Airflow 1.x中模块化复杂工作流,但在2.x中更推荐使用任务组。 * 始终确保子DAG的`dag_id`命名规范与父DAG关联。 * 避免深层嵌套子DAG,以免增加调度复杂度。 [[Category:大数据框架]] [[Category:Airflow]] [[Category:Airflow DAG开发]]
摘要:
请注意,所有对代码酷的贡献均被视为依照知识共享署名-非商业性使用-相同方式共享发表(详情请见
代码酷:著作权
)。如果您不希望您的文字作品被随意编辑和分发传播,请不要在此提交。
您同时也向我们承诺,您提交的内容为您自己所创作,或是复制自公共领域或类似自由来源。
未经许可,请勿提交受著作权保护的作品!
取消
编辑帮助
(在新窗口中打开)