跳转到内容

Airflow SubDagOperator 详解

来自代码酷

Airflow SubDagOperator 详解[编辑 | 编辑源代码]

介绍[编辑 | 编辑源代码]

SubDagOperator 是 Apache Airflow 中的一个核心 Operator,用于将复杂的 DAG(有向无环图)分解为更小的、可管理的子 DAG(Sub-DAG)。通过使用 SubDagOperator,用户可以将大型工作流模块化,提高代码的可读性和可维护性。Sub-DAG 本质上是一个嵌套的 DAG,它可以包含自己的任务和依赖关系,并在父 DAG 中以单个任务的形式出现。

SubDagOperator 特别适用于以下场景:

  • 需要重复使用一组任务的逻辑。
  • 工作流过于庞大,需要分解为更小的部分。
  • 多个 DAG 共享相同的子工作流。

基本语法与参数[编辑 | 编辑源代码]

SubDagOperator 的基本语法如下:

from airflow.operators.subdag import SubDagOperator
from airflow.models import DAG

# 父 DAG
parent_dag = DAG('parent_dag', default_args=default_args)

# 子 DAG 工厂函数
def subdag_factory(parent_dag_name, child_dag_name, args):
    dag_subdag = DAG(
        dag_id=f'{parent_dag_name}.{child_dag_name}',
        default_args=args,
        schedule_interval="@daily",
    )
    
    # 添加子 DAG 的任务
    task1 = DummyOperator(task_id='task1', dag=dag_subdag)
    task2 = DummyOperator(task_id='task2', dag=dag_subdag)
    task1 >> task2
    
    return dag_subdag

# 在父 DAG 中使用 SubDagOperator
subdag_task = SubDagOperator(
    task_id='subdag_task',
    subdag=subdag_factory('parent_dag', 'subdag_task', default_args),
    dag=parent_dag,
)

关键参数[编辑 | 编辑源代码]

  • task_id:子 DAG 任务的唯一标识符。
  • subdag:子 DAG 的定义,通常通过工厂函数生成。
  • dag:父 DAG 的引用。

工作原理[编辑 | 编辑源代码]

SubDagOperator 的工作原理如下: 1. 父 DAG 调用 SubDagOperator,并传入子 DAG 的定义。 2. Airflow 将子 DAG 视为父 DAG 中的一个任务。 3. 子 DAG 内部的任务按依赖关系执行。 4. 子 DAG 的所有任务完成后,父 DAG 继续执行后续任务。

graph TD A[父 DAG 开始] --> B[SubDagOperator] B --> C[子 DAG Task1] C --> D[子 DAG Task2] D --> E[父 DAG 结束]

实际案例[编辑 | 编辑源代码]

以下是一个实际应用场景:假设我们需要每天处理数据,并将处理逻辑分为数据抽取、转换和加载(ETL)三个步骤。我们可以将 ETL 逻辑封装为一个子 DAG,并在多个父 DAG 中复用。

代码示例[编辑 | 编辑源代码]

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.subdag import SubDagOperator
from datetime import datetime

default_args = {
    'owner': 'airflow',
    'start_date': datetime(2023, 1, 1),
}

def create_etl_subdag(parent_dag_name, child_dag_name, args):
    dag_subdag = DAG(
        dag_id=f'{parent_dag_name}.{child_dag_name}',
        default_args=args,
        schedule_interval="@daily",
    )
    
    extract = DummyOperator(task_id='extract', dag=dag_subdag)
    transform = DummyOperator(task_id='transform', dag=dag_subdag)
    load = DummyOperator(task_id='load', dag=dag_subdag)
    
    extract >> transform >> load
    return dag_subdag

# 父 DAG 定义
with DAG('daily_etl_pipeline', default_args=default_args, schedule_interval="@daily") as dag:
    start = DummyOperator(task_id='start')
    etl = SubDagOperator(
        task_id='etl',
        subdag=create_etl_subdag('daily_etl_pipeline', 'etl', default_args),
        dag=dag,
    )
    end = DummyOperator(task_id='end')
    
    start >> etl >> end

执行流程[编辑 | 编辑源代码]

1. 父 DAG 启动,执行 `start` 任务。 2. `etl` SubDagOperator 触发子 DAG 的执行:

  - 子 DAG 依次执行 `extract`、`transform`、`load`。

3. 子 DAG 完成后,父 DAG 执行 `end` 任务。

注意事项[编辑 | 编辑源代码]

  • 避免循环依赖:子 DAG 不能引用父 DAG 的任务,否则会导致循环依赖。
  • 并发控制:子 DAG 的并发受父 DAG 的 `concurrency` 和 `max_active_runs` 参数限制。
  • 性能影响:过多的子 DAG 可能导致调度性能下降。

数学表示[编辑 | 编辑源代码]

子 DAG 可以表示为父 DAG 的一个子图: Gparent=(Vparent,Eparent) Gsubdag=(Vsubdag,Esubdag),VsubdagVparent,EsubdagEparent

总结[编辑 | 编辑源代码]

SubDagOperator 是 Airflow 中实现工作流模块化的强大工具。通过将复杂逻辑分解为子 DAG,可以提高代码的可读性和复用性。但在使用时需注意避免循环依赖和性能问题。