跳转到内容
主菜单
主菜单
移至侧栏
隐藏
导航
首页
最近更改
随机页面
MediaWiki帮助
代码酷
搜索
搜索
中文(中国大陆)
外观
创建账号
登录
个人工具
创建账号
登录
未登录编辑者的页面
了解详情
贡献
讨论
编辑“︁
Airflow SubDagOperator 详解
”︁(章节)
页面
讨论
大陆简体
阅读
编辑
编辑源代码
查看历史
工具
工具
移至侧栏
隐藏
操作
阅读
编辑
编辑源代码
查看历史
常规
链入页面
相关更改
特殊页面
页面信息
外观
移至侧栏
隐藏
您的更改会在有权核准的用户核准后向读者展示。
警告:
您没有登录。如果您进行任何编辑,您的IP地址会公开展示。如果您
登录
或
创建账号
,您的编辑会以您的用户名署名,此外还有其他益处。
反垃圾检查。
不要
加入这个!
= Airflow SubDagOperator 详解 = == 介绍 == '''SubDagOperator''' 是 Apache Airflow 中的一个核心 Operator,用于将复杂的 DAG(有向无环图)分解为更小的、可管理的子 DAG(Sub-DAG)。通过使用 SubDagOperator,用户可以将大型工作流模块化,提高代码的可读性和可维护性。Sub-DAG 本质上是一个嵌套的 DAG,它可以包含自己的任务和依赖关系,并在父 DAG 中以单个任务的形式出现。 SubDagOperator 特别适用于以下场景: * 需要重复使用一组任务的逻辑。 * 工作流过于庞大,需要分解为更小的部分。 * 多个 DAG 共享相同的子工作流。 == 基本语法与参数 == SubDagOperator 的基本语法如下: <syntaxhighlight lang="python"> from airflow.operators.subdag import SubDagOperator from airflow.models import DAG # 父 DAG parent_dag = DAG('parent_dag', default_args=default_args) # 子 DAG 工厂函数 def subdag_factory(parent_dag_name, child_dag_name, args): dag_subdag = DAG( dag_id=f'{parent_dag_name}.{child_dag_name}', default_args=args, schedule_interval="@daily", ) # 添加子 DAG 的任务 task1 = DummyOperator(task_id='task1', dag=dag_subdag) task2 = DummyOperator(task_id='task2', dag=dag_subdag) task1 >> task2 return dag_subdag # 在父 DAG 中使用 SubDagOperator subdag_task = SubDagOperator( task_id='subdag_task', subdag=subdag_factory('parent_dag', 'subdag_task', default_args), dag=parent_dag, ) </syntaxhighlight> === 关键参数 === * '''task_id''':子 DAG 任务的唯一标识符。 * '''subdag''':子 DAG 的定义,通常通过工厂函数生成。 * '''dag''':父 DAG 的引用。 == 工作原理 == SubDagOperator 的工作原理如下: 1. 父 DAG 调用 SubDagOperator,并传入子 DAG 的定义。 2. Airflow 将子 DAG 视为父 DAG 中的一个任务。 3. 子 DAG 内部的任务按依赖关系执行。 4. 子 DAG 的所有任务完成后,父 DAG 继续执行后续任务。 <mermaid> graph TD A[父 DAG 开始] --> B[SubDagOperator] B --> C[子 DAG Task1] C --> D[子 DAG Task2] D --> E[父 DAG 结束] </mermaid> == 实际案例 == 以下是一个实际应用场景:假设我们需要每天处理数据,并将处理逻辑分为数据抽取、转换和加载(ETL)三个步骤。我们可以将 ETL 逻辑封装为一个子 DAG,并在多个父 DAG 中复用。 === 代码示例 === <syntaxhighlight lang="python"> from airflow import DAG from airflow.operators.dummy import DummyOperator from airflow.operators.subdag import SubDagOperator from datetime import datetime default_args = { 'owner': 'airflow', 'start_date': datetime(2023, 1, 1), } def create_etl_subdag(parent_dag_name, child_dag_name, args): dag_subdag = DAG( dag_id=f'{parent_dag_name}.{child_dag_name}', default_args=args, schedule_interval="@daily", ) extract = DummyOperator(task_id='extract', dag=dag_subdag) transform = DummyOperator(task_id='transform', dag=dag_subdag) load = DummyOperator(task_id='load', dag=dag_subdag) extract >> transform >> load return dag_subdag # 父 DAG 定义 with DAG('daily_etl_pipeline', default_args=default_args, schedule_interval="@daily") as dag: start = DummyOperator(task_id='start') etl = SubDagOperator( task_id='etl', subdag=create_etl_subdag('daily_etl_pipeline', 'etl', default_args), dag=dag, ) end = DummyOperator(task_id='end') start >> etl >> end </syntaxhighlight> === 执行流程 === 1. 父 DAG 启动,执行 `start` 任务。 2. `etl` SubDagOperator 触发子 DAG 的执行: - 子 DAG 依次执行 `extract`、`transform`、`load`。 3. 子 DAG 完成后,父 DAG 执行 `end` 任务。 == 注意事项 == * '''避免循环依赖''':子 DAG 不能引用父 DAG 的任务,否则会导致循环依赖。 * '''并发控制''':子 DAG 的并发受父 DAG 的 `concurrency` 和 `max_active_runs` 参数限制。 * '''性能影响''':过多的子 DAG 可能导致调度性能下降。 == 数学表示 == 子 DAG 可以表示为父 DAG 的一个子图: <math>G_{\text{parent}} = (V_{\text{parent}}, E_{\text{parent}})</math> <math>G_{\text{subdag}} = (V_{\text{subdag}}, E_{\text{subdag}}), \quad V_{\text{subdag}} \subseteq V_{\text{parent}}, E_{\text{subdag}} \subseteq E_{\text{parent}}</math> == 总结 == SubDagOperator 是 Airflow 中实现工作流模块化的强大工具。通过将复杂逻辑分解为子 DAG,可以提高代码的可读性和复用性。但在使用时需注意避免循环依赖和性能问题。 [[Category:大数据框架]] [[Category:Airflow]] [[Category:Airflow Operators详解]]
摘要:
请注意,所有对代码酷的贡献均被视为依照知识共享署名-非商业性使用-相同方式共享发表(详情请见
代码酷:著作权
)。如果您不希望您的文字作品被随意编辑和分发传播,请不要在此提交。
您同时也向我们承诺,您提交的内容为您自己所创作,或是复制自公共领域或类似自由来源。
未经许可,请勿提交受著作权保护的作品!
取消
编辑帮助
(在新窗口中打开)