跳转到内容

Airflow子DAG

来自代码酷

Airflow子DAG[编辑 | 编辑源代码]

介绍[编辑 | 编辑源代码]

子DAG(SubDAG)是Apache Airflow中用于模块化和管理复杂工作流的一种机制。它允许用户将一组相关的任务封装为一个独立的DAG,并在父DAG中以单个任务的形式调用。子DAG的主要目的是提高代码的可重用性和可维护性,同时避免父DAG变得过于庞大和难以管理。

子DAG的核心特点包括:

  • 封装性:将多个任务组合为一个逻辑单元。
  • 复用性:可在多个父DAG中重复使用相同的子DAG。
  • 层级结构:支持嵌套,子DAG内部可以再包含其他子DAG。

为什么使用子DAG?[编辑 | 编辑源代码]

在以下场景中,子DAG特别有用: 1. 任务分组:将逻辑相关的任务(如数据预处理步骤)封装为一个子DAG。 2. 减少视觉混乱:简化父DAG的图形化表示,使其更易读。 3. 并行执行:通过子DAG的并发控制,优化任务调度。

子DAG的基本用法[编辑 | 编辑源代码]

定义子DAG[编辑 | 编辑源代码]

子DAG是一个普通的DAG,但需要通过`SubDagOperator`在父DAG中调用。以下是一个简单的子DAG定义示例:

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.subdag import SubDagOperator
from datetime import datetime

def create_subdag(parent_dag_name, child_dag_name, args):
    """
    定义一个子DAG的函数
    """
    subdag = DAG(
        dag_id=f"{parent_dag_name}.{child_dag_name}",
        default_args=args,
        schedule_interval="@daily",
    )

    with subdag:
        start = DummyOperator(task_id="start")
        end = DummyOperator(task_id="end")
        start >> end

    return subdag

# 父DAG定义
with DAG("parent_dag", start_date=datetime(2023, 1, 1)) as dag:
    subdag_task = SubDagOperator(
        task_id="run_subdag",
        subdag=create_subdag("parent_dag", "child_dag", dag.default_args),
    )

关键参数说明[编辑 | 编辑源代码]

  • `dag_id`:子DAG的ID必须采用`父DAG名.子DAG名`的格式。
  • `default_args`:子DAG继承父DAG的参数(如`start_date`、`retries`)。
  • `schedule_interval`:子DAG的调度间隔通常与父DAG一致。

子DAG的注意事项[编辑 | 编辑源代码]

并发与性能[编辑 | 编辑源代码]

  • 子DAG会占用一个完整的调度器槽(slot),可能导致资源浪费。
  • Airflow 2.0+推荐使用任务组(TaskGroup)替代子DAG,因为后者存在性能瓶颈。

错误处理[编辑 | 编辑源代码]

  • 子DAG中的任务失败会标记整个子DAG任务为失败。
  • 父DAG可以通过`trigger_rule`参数控制依赖行为。

实际案例:ETL流水线[编辑 | 编辑源代码]

以下是一个ETL(提取、转换、加载)流水线的子DAG实现示例,其中子DAG负责数据清洗:

def create_cleaning_subdag(parent_dag_name, child_dag_name, args):
    subdag = DAG(
        f"{parent_dag_name}.{child_dag_name}",
        default_args=args,
    )

    with subdag:
        extract = DummyOperator(task_id="extract_raw_data")
        clean_data = PythonOperator(
            task_id="clean_data",
            python_callable=lambda: print("Running data cleaning..."),
        )
        load_metadata = DummyOperator(task_id="update_metadata")

        extract >> clean_data >> load_metadata

    return subdag

子DAG vs 任务组[编辑 | 编辑源代码]

graph TD A[父DAG] -->|SubDagOperator| B(子DAG) A -->|TaskGroup| C[内联任务组] B --> D[独立DAG文件] C --> E[无需额外DAG]

| 特性 | 子DAG | 任务组 (TaskGroup) | |--------------------|------------------------|--------------------------| | **资源占用** | 高(独立槽位) | 低(共享槽位) | | **适用版本** | Airflow 1.x/2.x | Airflow 2.0+ | | **代码位置** | 需单独文件 | 可在同一文件 | | **调试难度** | 较高(跨DAG) | 较低(内联) |

数学表达:子DAG调度[编辑 | 编辑源代码]

子DAG的调度时间需与父DAG对齐。设父DAG的调度间隔为Tp,子DAG为Tc,则需满足: Tc=kTp(k)

总结[编辑 | 编辑源代码]

  • 子DAG适合在Airflow 1.x中模块化复杂工作流,但在2.x中更推荐使用任务组。
  • 始终确保子DAG的`dag_id`命名规范与父DAG关联。
  • 避免深层嵌套子DAG,以免增加调度复杂度。