Airflow子DAG
Airflow子DAG[编辑 | 编辑源代码]
介绍[编辑 | 编辑源代码]
子DAG(SubDAG)是Apache Airflow中用于模块化和管理复杂工作流的一种机制。它允许用户将一组相关的任务封装为一个独立的DAG,并在父DAG中以单个任务的形式调用。子DAG的主要目的是提高代码的可重用性和可维护性,同时避免父DAG变得过于庞大和难以管理。
子DAG的核心特点包括:
- 封装性:将多个任务组合为一个逻辑单元。
- 复用性:可在多个父DAG中重复使用相同的子DAG。
- 层级结构:支持嵌套,子DAG内部可以再包含其他子DAG。
为什么使用子DAG?[编辑 | 编辑源代码]
在以下场景中,子DAG特别有用: 1. 任务分组:将逻辑相关的任务(如数据预处理步骤)封装为一个子DAG。 2. 减少视觉混乱:简化父DAG的图形化表示,使其更易读。 3. 并行执行:通过子DAG的并发控制,优化任务调度。
子DAG的基本用法[编辑 | 编辑源代码]
定义子DAG[编辑 | 编辑源代码]
子DAG是一个普通的DAG,但需要通过`SubDagOperator`在父DAG中调用。以下是一个简单的子DAG定义示例:
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.subdag import SubDagOperator
from datetime import datetime
def create_subdag(parent_dag_name, child_dag_name, args):
"""
定义一个子DAG的函数
"""
subdag = DAG(
dag_id=f"{parent_dag_name}.{child_dag_name}",
default_args=args,
schedule_interval="@daily",
)
with subdag:
start = DummyOperator(task_id="start")
end = DummyOperator(task_id="end")
start >> end
return subdag
# 父DAG定义
with DAG("parent_dag", start_date=datetime(2023, 1, 1)) as dag:
subdag_task = SubDagOperator(
task_id="run_subdag",
subdag=create_subdag("parent_dag", "child_dag", dag.default_args),
)
关键参数说明[编辑 | 编辑源代码]
- `dag_id`:子DAG的ID必须采用`父DAG名.子DAG名`的格式。
- `default_args`:子DAG继承父DAG的参数(如`start_date`、`retries`)。
- `schedule_interval`:子DAG的调度间隔通常与父DAG一致。
子DAG的注意事项[编辑 | 编辑源代码]
并发与性能[编辑 | 编辑源代码]
- 子DAG会占用一个完整的调度器槽(slot),可能导致资源浪费。
- Airflow 2.0+推荐使用任务组(TaskGroup)替代子DAG,因为后者存在性能瓶颈。
错误处理[编辑 | 编辑源代码]
- 子DAG中的任务失败会标记整个子DAG任务为失败。
- 父DAG可以通过`trigger_rule`参数控制依赖行为。
实际案例:ETL流水线[编辑 | 编辑源代码]
以下是一个ETL(提取、转换、加载)流水线的子DAG实现示例,其中子DAG负责数据清洗:
def create_cleaning_subdag(parent_dag_name, child_dag_name, args):
subdag = DAG(
f"{parent_dag_name}.{child_dag_name}",
default_args=args,
)
with subdag:
extract = DummyOperator(task_id="extract_raw_data")
clean_data = PythonOperator(
task_id="clean_data",
python_callable=lambda: print("Running data cleaning..."),
)
load_metadata = DummyOperator(task_id="update_metadata")
extract >> clean_data >> load_metadata
return subdag
子DAG vs 任务组[编辑 | 编辑源代码]
| 特性 | 子DAG | 任务组 (TaskGroup) | |--------------------|------------------------|--------------------------| | **资源占用** | 高(独立槽位) | 低(共享槽位) | | **适用版本** | Airflow 1.x/2.x | Airflow 2.0+ | | **代码位置** | 需单独文件 | 可在同一文件 | | **调试难度** | 较高(跨DAG) | 较低(内联) |
数学表达:子DAG调度[编辑 | 编辑源代码]
子DAG的调度时间需与父DAG对齐。设父DAG的调度间隔为,子DAG为,则需满足:
总结[编辑 | 编辑源代码]
- 子DAG适合在Airflow 1.x中模块化复杂工作流,但在2.x中更推荐使用任务组。
- 始终确保子DAG的`dag_id`命名规范与父DAG关联。
- 避免深层嵌套子DAG,以免增加调度复杂度。