跳转到内容
主菜单
主菜单
移至侧栏
隐藏
导航
首页
最近更改
随机页面
MediaWiki帮助
代码酷
搜索
搜索
中文(中国大陆)
外观
创建账号
登录
个人工具
创建账号
登录
未登录编辑者的页面
了解详情
贡献
讨论
编辑“︁
Airflow DAG触发规则
”︁(章节)
页面
讨论
大陆简体
阅读
编辑
编辑源代码
查看历史
工具
工具
移至侧栏
隐藏
操作
阅读
编辑
编辑源代码
查看历史
常规
链入页面
相关更改
特殊页面
页面信息
外观
移至侧栏
隐藏
您的更改会在有权核准的用户核准后向读者展示。
警告:
您没有登录。如果您进行任何编辑,您的IP地址会公开展示。如果您
登录
或
创建账号
,您的编辑会以您的用户名署名,此外还有其他益处。
反垃圾检查。
不要
加入这个!
= Airflow DAG触发规则 = '''Airflow DAG触发规则'''(Trigger Rules)是Apache Airflow中控制任务执行逻辑的核心机制之一。它定义了任务在满足何种条件下才能被触发执行,尤其是在处理依赖关系复杂的DAG(有向无环图)时尤为重要。本文将详细介绍Airflow支持的触发规则类型、适用场景及实际案例。 == 概述 == 在Airflow中,DAG由多个任务(Task)组成,任务之间通过上下游依赖关系(使用`>>`或`set_upstream/set_downstream`)连接。默认情况下,一个任务仅在其'''所有直接上游任务'''成功执行后才会触发(即`all_success`规则)。然而,某些场景需要更灵活的控制逻辑,例如: * 某个任务只要任意一个上游任务完成即可执行 * 上游任务失败时仍需要执行当前任务 * 完全忽略上游任务状态 Airflow通过`trigger_rule`参数提供多种触发规则选项,覆盖常见工作流需求。 == 触发规则类型 == 以下是Airflow支持的全部触发规则(截至2.0+版本): {| class="wikitable" ! 规则名称 !! 描述 !! 适用场景 |- | `all_success`(默认) || 所有上游任务'''必须成功''' || 标准顺序流程 |- | `all_failed` || 所有上游任务'''必须失败''' || 错误处理流程 |- | `all_done` || 所有上游任务'''完成'''(无论成功与否) || 清理或通知任务 |- | `one_success` || 至少一个上游任务成功 || 并行分支选择 |- | `one_failed` || 至少一个上游任务失败 || 快速失败检测 |- | `none_failed` || 没有上游任务失败(允许跳过或成功) || 条件分支 |- | `none_skipped` || 没有上游任务被跳过 || 严格顺序流程 |- | `dummy` || 完全忽略依赖关系 || 手动触发或测试 |} === 状态流转示意图 === <mermaid> graph TD A[Task A] -->|success| B[Task B: all_success] A -->|failed| C[Task C: all_failed] A -->|skipped| D[Task D: none_skipped] A -->|any state| E[Task E: all_done] </mermaid> == 代码示例 == 以下示例展示不同触发规则的实际应用: === 基础用法 === <syntaxhighlight lang="python"> from airflow import DAG from airflow.operators.dummy import DummyOperator from airflow.utils.dates import days_ago dag = DAG('trigger_rules_demo', start_date=days_ago(1)) start = DummyOperator(task_id='start', dag=dag) branch_a = DummyOperator(task_id='branch_a', dag=dag) branch_b = DummyOperator(task_id='branch_b', dag=dag) join = DummyOperator(task_id='join', dag=dag, trigger_rule='one_success') # 关键配置 start >> [branch_a, branch_b] >> join </syntaxhighlight> '''执行逻辑''': * `join`任务会在`branch_a`或`branch_b`中'''任意一个'''成功时触发 === 错误处理场景 === <syntaxhighlight lang="python"> process = PythonOperator(task_id='process_data', python_callable=process_fn, dag=dag) validate = PythonOperator(task_id='validate', python_callable=validate_fn, dag=dag) notify_failure = EmailOperator(task_id='notify_failure', dag=dag, trigger_rule='all_failed') process >> validate >> notify_failure </syntaxhighlight> '''执行逻辑''': * 仅当`validate`任务失败时才会发送失败通知 == 数学表达 == 触发规则可以形式化表示为状态集合的布尔运算。设<math>upstream(T)</math>为任务T的所有上游任务集合,<math>state(t)</math>为任务t的状态(成功/失败/跳过等),则: * `all_success`: <math>\forall t \in upstream(T), state(t) = success</math> * `one_failed`: <math>\exists t \in upstream(T), state(t) = failed</math> * `none_skipped`: <math>\nexists t \in upstream(T), state(t) = skipped</math> == 实际案例 == === 案例1:并行数据处理 === 场景:需要处理来自多个数据源的文件,只要任一数据源处理完成即执行汇总任务。 <mermaid> graph LR A[下载源A] --> C[汇总数据] B[下载源B] --> C style C fill:#f9f,stroke:#333 </mermaid> 配置代码: <syntaxhighlight lang="python"> summarize = PythonOperator( task_id='summarize', trigger_rule='one_success', # 任一上游成功即触发 ... ) </syntaxhighlight> === 案例2:条件警报系统 === 场景:当所有检测任务均失败时触发紧急警报。 <syntaxhighlight lang="python"> alert = SlackOperator( task_id='emergency_alert', trigger_rule='all_failed', # 所有上游失败才触发 ... ) </syntaxhighlight> == 常见问题 == '''Q: 如何跳过任务?''' * 使用`BranchPythonOperator`或通过PythonCallback返回`airflow.utils.state.State.SKIPPED` '''Q: `none_failed`与`all_success`的区别?''' * `none_failed`允许上游被跳过(SKIPPED),而`all_success`要求显式成功 '''Q: 为什么我的任务没有触发?''' * 检查:1) 上游任务状态 2) 触发规则配置 3) DAG运行日期是否匹配 == 最佳实践 == * 谨慎使用`dummy`规则,可能导致意外行为 * 在复杂工作流中为关键节点添加注释说明触发规则 * 使用`all_done`规则进行资源清理(如临时文件删除) * 测试时结合`airflow tasks test`命令验证触发逻辑 通过合理运用触发规则,可以构建出既能处理常规流程又能应对异常情况的健壮数据流水线。 [[Category:大数据框架]] [[Category:Airflow]] [[Category:Airflow DAG开发]]
摘要:
请注意,所有对代码酷的贡献均被视为依照知识共享署名-非商业性使用-相同方式共享发表(详情请见
代码酷:著作权
)。如果您不希望您的文字作品被随意编辑和分发传播,请不要在此提交。
您同时也向我们承诺,您提交的内容为您自己所创作,或是复制自公共领域或类似自由来源。
未经许可,请勿提交受著作权保护的作品!
取消
编辑帮助
(在新窗口中打开)