跳转到内容
主菜单
主菜单
移至侧栏
隐藏
导航
首页
最近更改
随机页面
MediaWiki帮助
代码酷
搜索
搜索
中文(中国大陆)
外观
创建账号
登录
个人工具
创建账号
登录
未登录编辑者的页面
了解详情
贡献
讨论
编辑“︁
Airflow调度依赖
”︁
页面
讨论
大陆简体
阅读
编辑
编辑源代码
查看历史
工具
工具
移至侧栏
隐藏
操作
阅读
编辑
编辑源代码
查看历史
常规
链入页面
相关更改
特殊页面
页面信息
外观
移至侧栏
隐藏
您的更改会在有权核准的用户核准后向读者展示。
警告:
您没有登录。如果您进行任何编辑,您的IP地址会公开展示。如果您
登录
或
创建账号
,您的编辑会以您的用户名署名,此外还有其他益处。
反垃圾检查。
不要
加入这个!
= Airflow调度依赖 = '''Airflow调度依赖'''是Apache Airflow工作流自动化工具中的核心概念,用于定义任务(Task)之间的执行顺序和依赖关系。通过合理设置调度依赖,可以确保任务按照预期的逻辑顺序执行,从而实现复杂的数据管道和工作流管理。 == 介绍 == 在Airflow中,每个工作流由多个任务组成,这些任务通常以'''有向无环图(DAG, Directed Acyclic Graph)'''的形式组织。调度依赖决定了任务之间的执行顺序,即哪些任务必须在其他任务之前完成。Airflow提供了多种方式来定义这些依赖关系,确保工作流的正确性和可靠性。 调度依赖的主要类型包括: * '''线性依赖'''(一个任务完成后执行另一个任务) * '''并行依赖'''(多个任务可以同时执行) * '''条件依赖'''(根据条件决定是否执行后续任务) == 基本语法 == 在Airflow中,调度依赖通常通过位运算符(`>>` 和 `<<`)或`set_upstream`/`set_downstream`方法来定义。以下是基本语法示例: <syntaxhighlight lang="python"> from airflow import DAG from airflow.operators.dummy_operator import DummyOperator from datetime import datetime # 定义DAG dag = DAG( 'example_dag', start_date=datetime(2023, 1, 1), schedule_interval='@daily' ) # 定义任务 task1 = DummyOperator(task_id='task1', dag=dag) task2 = DummyOperator(task_id='task2', dag=dag) task3 = DummyOperator(task_id='task3', dag=dag) # 定义依赖关系(方法1:使用位运算符) task1 >> task2 >> task3 # 等价于(方法2:使用set_downstream) # task1.set_downstream(task2) # task2.set_downstream(task3) </syntaxhighlight> === 输出解释 === 上述代码定义了一个简单的DAG,包含三个任务(`task1`、`task2`、`task3`),依赖关系为: * `task1` 必须在 `task2` 之前执行 * `task2` 必须在 `task3` 之前执行 == 复杂依赖关系 == Airflow支持更复杂的依赖关系,例如并行执行和条件分支。 === 并行依赖示例 === <syntaxhighlight lang="python"> task1 >> [task2, task3] >> task4 </syntaxhighlight> <mermaid> graph LR A[task1] --> B[task2] A --> C[task3] B --> D[task4] C --> D </mermaid> 解释: * `task1` 完成后,`task2` 和 `task3` 并行执行 * `task2` 和 `task3` 都完成后,`task4` 才会执行 === 条件依赖示例 === 通过`BranchPythonOperator`可以实现条件分支: <syntaxhighlight lang="python"> from airflow.operators.python_operator import BranchPythonOperator def decide_branch(**kwargs): if some_condition: return 'task2' else: return 'task3' branch_task = BranchPythonOperator( task_id='branch_task', python_callable=decide_branch, dag=dag ) task1 >> branch_task >> [task2, task3] </syntaxhighlight> == 实际案例 == === 数据管道示例 === 假设我们需要构建一个ETL(提取、转换、加载)管道: 1. 从数据库提取数据(`extract_data`) 2. 清洗数据(`clean_data`) 3. 并行执行两个分析任务(`analyze_data_A` 和 `analyze_data_B`) 4. 合并结果并存储(`store_results`) <syntaxhighlight lang="python"> extract_data >> clean_data >> [analyze_data_A, analyze_data_B] >> store_results </syntaxhighlight> <mermaid> graph LR A[extract_data] --> B[clean_data] B --> C[analyze_data_A] B --> D[analyze_data_B] C --> E[store_results] D --> E </mermaid> === 跨DAG依赖 === 通过`ExternalTaskSensor`可以实现跨DAG的依赖: <syntaxhighlight lang="python"> from airflow.sensors.external_task_sensor import ExternalTaskSensor wait_for_other_dag = ExternalTaskSensor( task_id='wait_for_other_dag', external_dag_id='other_dag_id', external_task_id='some_task', dag=dag ) wait_for_other_dag >> my_task </syntaxhighlight> == 高级主题 == === 动态依赖 === Airflow支持在运行时动态生成依赖关系: <syntaxhighlight lang="python"> for i in range(5): task = DummyOperator(task_id=f'task_{i}', dag=dag) if i > 0: previous_task >> task previous_task = task </syntaxhighlight> === 任务组依赖 === 在Airflow 2.0+中,可以使用`TaskGroup`组织任务并定义组间依赖: <syntaxhighlight lang="python"> from airflow.utils.task_group import TaskGroup with TaskGroup('group1', dag=dag) as tg1: task1 = DummyOperator(task_id='task1') task2 = DummyOperator(task_id='task2') task1 >> task2 with TaskGroup('group2', dag=dag) as tg2: task3 = DummyOperator(task_id='task3') tg1 >> tg2 </syntaxhighlight> == 常见问题 == '''Q: 如何查看任务的依赖关系?''' A: 可以通过Airflow UI的Graph视图查看,或使用命令行: <syntaxhighlight lang="bash"> airflow tasks list example_dag --tree </syntaxhighlight> '''Q: 如果循环依赖会怎样?''' A: Airflow会检测循环依赖并抛出`AirflowDagCycleException`。 == 数学表示 == 任务依赖可以形式化为偏序关系: <math> \text{对于任务集合 } T = \{t_1, t_2, ..., t_n\}, \text{依赖关系 } \prec \text{ 满足 } t_i \prec t_j \text{ 当且仅当 } t_i \text{ 必须在 } t_j \text{ 之前执行} </math> == 总结 == Airflow调度依赖是构建可靠工作流的基础,通过: * 简单的线性依赖 * 灵活的并行执行 * 强大的条件逻辑 * 跨DAG的协调能力 开发者可以构建各种复杂的数据管道和工作流。理解并正确应用这些依赖关系是掌握Airflow的关键。 [[Category:大数据框架]] [[Category:Airflow]] [[Category:Airflow调度与触发]]
摘要:
请注意,所有对代码酷的贡献均被视为依照知识共享署名-非商业性使用-相同方式共享发表(详情请见
代码酷:著作权
)。如果您不希望您的文字作品被随意编辑和分发传播,请不要在此提交。
您同时也向我们承诺,您提交的内容为您自己所创作,或是复制自公共领域或类似自由来源。
未经许可,请勿提交受著作权保护的作品!
取消
编辑帮助
(在新窗口中打开)