跳转到内容

Airflow核心概念

来自代码酷
Admin留言 | 贡献2025年4月29日 (二) 18:51的版本 (Page creation by admin bot)

(差异) ←上一版本 | 已核准修订 (差异) | 最后版本 (差异) | 下一版本→ (差异)

Airflow核心概念[编辑 | 编辑源代码]

介绍[编辑 | 编辑源代码]

Apache Airflow 是一个用于编排、调度和监控工作流的开源平台,其核心设计基于有向无环图(DAG)。理解Airflow的核心概念是掌握其工作原理的关键。本章将详细介绍以下核心组件:

  • DAG(有向无环图)
  • Operator(操作器)
  • Task(任务)
  • Scheduler(调度器)
  • Executor(执行器)
  • Variables & Connections(变量与连接)

DAG(有向无环图)[编辑 | 编辑源代码]

DAG是Airflow的核心数据结构,用于定义工作流的依赖关系和执行顺序。DAG中的每个节点代表一个任务,边代表任务间的依赖关系。

特性[编辑 | 编辑源代码]

  • 无环:任务依赖不能形成循环,否则会导致无限执行。
  • 动态定义:通过Python代码动态生成DAG。
  • 定时调度:支持基于时间或外部触发的调度。

示例代码[编辑 | 编辑源代码]

  
from airflow import DAG  
from airflow.operators.dummy import DummyOperator  
from datetime import datetime  

# 定义DAG  
dag = DAG(  
    dag_id="example_dag",  
    start_date=datetime(2023, 1, 1),  
    schedule_interval="@daily"  
)  

# 定义任务  
task1 = DummyOperator(task_id="task1", dag=dag)  
task2 = DummyOperator(task_id="task2", dag=dag)  

# 设置依赖  
task1 >> task2  # task2依赖task1

可视化表示[编辑 | 编辑源代码]

graph LR A[task1] --> B[task2]

Operator(操作器)[编辑 | 编辑源代码]

Operator定义了单个任务的执行逻辑。Airflow提供多种内置Operator,如:

  • BashOperator:执行Bash命令。
  • PythonOperator:调用Python函数。
  • EmailOperator:发送邮件。

示例代码[编辑 | 编辑源代码]

  
from airflow.operators.bash import BashOperator  

bash_task = BashOperator(  
    task_id="bash_example",  
    bash_command="echo 'Hello Airflow!'",  
    dag=dag  
)

Task(任务)[编辑 | 编辑源代码]

Task是DAG中的一个节点,是Operator的实例化对象。每个Task必须属于一个DAG,并通过依赖关系定义执行顺序。

Scheduler(调度器)[编辑 | 编辑源代码]

Scheduler负责解析DAG、检查调度周期,并将任务提交给Executor执行。其工作流程如下:

flowchart TD A[解析DAG] --> B[生成Task实例] B --> C[检查依赖和调度时间] C --> D[提交给Executor]

Executor(执行器)[编辑 | 编辑源代码]

Executor决定任务如何执行,常见类型包括:

  • LocalExecutor:本地并行执行。
  • CeleryExecutor:分布式执行。
  • KubernetesExecutor:在Kubernetes Pod中执行。

Variables & Connections(变量与连接)[编辑 | 编辑源代码]

  • Variables:存储全局配置,如环境变量。
  • Connections:管理外部系统的连接信息(如数据库、API)。

示例代码[编辑 | 编辑源代码]

  
from airflow.models import Variable  

# 设置和获取变量  
Variable.set("max_retries", 3)  
max_retries = Variable.get("max_retries")

实际案例[编辑 | 编辑源代码]

假设需要每天从数据库提取数据并发送报告: 1. 使用PythonOperator调用数据提取脚本。 2. 使用BashOperator运行数据处理命令。 3. 使用EmailOperator发送结果邮件。

依赖关系[编辑 | 编辑源代码]

graph LR A[Extract Data] --> B[Process Data] B --> C[Send Email]

数学基础(可选)[编辑 | 编辑源代码]

对于依赖关系的拓扑排序,Airflow使用以下算法: 解析失败 (语法错误): {\displaystyle T = \text{空列表} \\ \text{当DAG非空时:} \\ \quad \text{选择入度为0的节点} n \\ \quad T.\text{append}(n) \\ \quad \text{从DAG中移除} n \\ }

总结[编辑 | 编辑源代码]

Airflow的核心概念围绕DAG、Operator和调度系统展开。通过灵活组合这些组件,可以构建复杂的数据管道。初学者应从编写简单DAG开始,逐步掌握高级特性如动态任务生成和自定义Operator。