跳转到内容
主菜单
主菜单
移至侧栏
隐藏
导航
首页
最近更改
随机页面
MediaWiki帮助
代码酷
搜索
搜索
中文(中国大陆)
外观
创建账号
登录
个人工具
创建账号
登录
未登录编辑者的页面
了解详情
贡献
讨论
编辑“︁
Airflow触发器规则
”︁
页面
讨论
大陆简体
阅读
编辑
编辑源代码
查看历史
工具
工具
移至侧栏
隐藏
操作
阅读
编辑
编辑源代码
查看历史
常规
链入页面
相关更改
特殊页面
页面信息
外观
移至侧栏
隐藏
您的更改会在有权核准的用户核准后向读者展示。
警告:
您没有登录。如果您进行任何编辑,您的IP地址会公开展示。如果您
登录
或
创建账号
,您的编辑会以您的用户名署名,此外还有其他益处。
反垃圾检查。
不要
加入这个!
= Airflow触发器规则 = '''Airflow触发器规则(Trigger Rules)'''是Apache Airflow中控制任务执行逻辑的核心机制之一,它定义了任务在满足何种条件下才能被触发执行。通过合理配置触发器规则,用户可以精确控制DAG(有向无环图)中任务间的依赖关系和行为。 == 基本概念 == 在Airflow中,默认情况下,任务仅当其'''所有直接上游任务'''均成功完成时才会被触发(对应规则`all_success`)。但实际场景中可能需要更灵活的条件,例如: * 部分上游失败时仍执行任务 * 任意上游成功即触发 * 显式跳过任务时触发下游 触发器规则通过`trigger_rule`参数配置,支持多种预设规则。 == 内置规则类型 == 下表列出Airflow所有内置触发器规则及其含义: {| class="wikitable" |+ 触发器规则列表 ! 规则名称 !! 描述 !! 适用场景 |- | `all_success` || (默认)所有上游任务成功 || 严格依赖所有前置任务 |- | `all_failed` || 所有上游任务失败 || 错误处理或备用路径 |- | `all_done` || 所有上游任务完成(无论状态) || 清理或日志收集 |- | `one_failed` || 至少一个上游任务失败(不等待全部完成) || 快速失败响应 |- | `one_success` || 至少一个上游任务成功 || 冗余任务选择 |- | `none_failed` || 无上游任务失败(允许跳过或成功) || 非严格流程 |- | `none_skipped` || 无上游任务被跳过 || 跳过敏感流程 |- | `dummy` || 无视依赖关系 || 手动触发或测试 |} == 代码示例 == 以下示例展示如何为任务设置不同的触发器规则: <syntaxhighlight lang="python"> from airflow import DAG from airflow.operators.dummy import DummyOperator from airflow.utils.dates import days_ago dag = DAG( 'trigger_rules_demo', start_date=days_ago(1), schedule_interval=None ) start = DummyOperator(task_id='start', dag=dag) branch_a = DummyOperator(task_id='branch_a', dag=dag) branch_b = DummyOperator(task_id='branch_b', dag=dag) join = DummyOperator( task_id='join', trigger_rule='none_failed', # 关键配置 dag=dag ) start >> [branch_a, branch_b] >> join </syntaxhighlight> '''执行逻辑说明''': 1. `start`任务首先执行 2. `branch_a`和`branch_b`并行运行 3. 即使其中一个分支失败,`join`任务仍会执行(因配置`none_failed`) == 状态传播机制 == 理解触发器规则需要明确Airflow的任务状态如何影响下游: <mermaid> graph LR A[成功] -->|all_success| B[触发] C[失败] -->|one_failed| D[立即触发] E[跳过] -->|none_skipped| F[不触发] </mermaid> 数学表达触发条件: <math> \text{Trigger} = \begin{cases} \text{True} & \text{if } \forall t \in \text{upstream}, \text{status}(t) \in S \\ \text{False} & \text{otherwise} \end{cases} </math> 其中<math>S</math>为规则允许的状态集合。 == 实际应用案例 == === 案例1:容错处理流程 === 在ETL管道中,当主数据处理失败时自动触发备用数据源: <syntaxhighlight lang="python"> primary_etl = PythonOperator(task_id='primary_etl', ...) fallback_etl = PythonOperator(task_id='fallback_etl', ...) notify = EmailOperator( task_id='notify', trigger_rule='one_success', ... ) primary_etl >> notify fallback_etl >> notify </syntaxhighlight> '''行为''':只要任一ETL成功即发送通知 === 案例2:多路径工作流 === 数据分析DAG中根据不同条件执行分支: <mermaid> graph TD A[数据加载] --> B[质量检查] B -->|通过| C[标准分析] B -->|失败| D[基础分析] E[生成报告] --> F{trigger_rule=all_done} </mermaid> == 高级配置技巧 == 1. '''动态规则选择''':通过XCom传递规则名称 <syntaxhighlight lang="python"> def decide_rule(**context): return 'one_success' if context['params']['fast_mode'] else 'all_success' DynamicRuleOperator( task_id='dynamic_rule', python_callable=decide_rule, provide_context=True ) </syntaxhighlight> 2. '''规则组合''':通过ShortCircuitOperator实现复杂逻辑 <syntaxhighlight lang="python"> check = ShortCircuitOperator( task_id='check_condition', python_callable=lambda: random.choice([True, False]) ) process = PythonOperator( task_id='process_data', trigger_rule='all_done', ... ) check >> process # 可能跳过process </syntaxhighlight> == 常见问题 == '''Q:为什么我的任务没有按预期触发?''' A:检查以下方面: * 上游任务的实际状态(注意"跳过"与"失败"的区别) * 触发器规则拼写是否正确(Airflow严格匹配字符串) * 是否存在跨DAG依赖(需使用ExternalTaskSensor) '''Q:如何调试触发器行为?''' A:使用CLI命令查看任务状态: <syntaxhighlight lang="bash"> airflow tasks list --tree dag_id # 显示依赖树 airflow task states dag_id task_id # 查看历史状态 </syntaxhighlight> == 最佳实践 == 1. 在测试环境充分验证复杂触发逻辑 2. 为关键任务添加超时设置(`execution_timeout`) 3. 避免过度使用`dummy`规则,可能破坏流程可见性 4. 在文档中明确记录非默认规则的使用原因 通过掌握触发器规则,您可以构建出既能处理常规流程又能优雅应对异常情况的健壮工作流。建议从简单规则开始,逐步尝试更复杂的组合模式。 [[Category:大数据框架]] [[Category:Airflow]] [[Category:Airflow调度与触发]]
摘要:
请注意,所有对代码酷的贡献均被视为依照知识共享署名-非商业性使用-相同方式共享发表(详情请见
代码酷:著作权
)。如果您不希望您的文字作品被随意编辑和分发传播,请不要在此提交。
您同时也向我们承诺,您提交的内容为您自己所创作,或是复制自公共领域或类似自由来源。
未经许可,请勿提交受著作权保护的作品!
取消
编辑帮助
(在新窗口中打开)