Airflow工作流定义
Airflow工作流定义[编辑 | 编辑源代码]
Airflow工作流定义是Apache Airflow的核心概念之一,用于描述任务的执行顺序和依赖关系。工作流在Airflow中以有向无环图(DAG, Directed Acyclic Graph)的形式表示,其中每个节点代表一个任务(Task),边代表任务之间的依赖关系。本文将详细介绍如何定义Airflow工作流,包括DAG的结构、任务依赖关系以及实际应用示例。
什么是工作流定义?[编辑 | 编辑源代码]
在Airflow中,工作流定义是一个Python脚本,用于描述一组任务及其执行顺序。工作流由以下几个关键部分组成:
- DAG(Directed Acyclic Graph):定义工作流的整体结构,包括调度时间、重试策略等。
- Operators:定义具体的任务类型(如Python函数、Bash命令、SQL查询等)。
- Tasks:Operator的实例,表示工作流中的一个具体执行单元。
- Dependencies:定义任务之间的依赖关系,即执行顺序。
工作流定义的核心目标是确保任务按照正确的顺序执行,同时支持调度、监控和错误处理。
DAG的基本结构[编辑 | 编辑源代码]
以下是一个简单的DAG定义示例,展示如何创建一个包含两个任务的工作流:
from datetime import datetime
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
def print_hello():
print("Hello from PythonOperator!")
# 定义DAG
with DAG(
dag_id="example_dag",
start_date=datetime(2023, 1, 1),
schedule_interval="@daily",
) as dag:
# 定义任务1:Bash命令
task1 = BashOperator(
task_id="print_date",
bash_command="date",
)
# 定义任务2:Python函数
task2 = PythonOperator(
task_id="print_hello",
python_callable=print_hello,
)
# 定义依赖关系:task1 -> task2
task1 >> task2
代码解释[编辑 | 编辑源代码]
1. DAG初始化:通过DAG()
构造函数定义DAG的基本属性,如dag_id
、start_date
和schedule_interval
。
2. 任务定义:
*BashOperator
执行Bash命令(如打印当前日期)。 *PythonOperator
调用Python函数(如打印"Hello")。
3. 依赖关系:通过>>
符号表示task1
必须在task2
之前执行。
任务依赖关系的表示[编辑 | 编辑源代码]
Airflow支持多种方式定义任务依赖关系:
1. 使用>>
和<<
符号:
task1 >> task2 # task1先执行,task2后执行
task2 << task1 # 等价写法
2. 使用set_upstream()
和set_downstream()
方法:
task1.set_downstream(task2) # task1 -> task2
task2.set_upstream(task1) # 等价写法
3. 复杂依赖关系(并行与串行结合):
(task1 >> task2 >> task3) # 串行执行
(task1 >> [task2, task3]) # task1完成后,task2和task3并行执行
依赖关系图示例[编辑 | 编辑源代码]
以下是一个包含并行任务的DAG依赖关系图:
解释:
- Task1完成后,Task2和Task3并行执行。
- Task4需要等待Task2和Task3都完成后才能执行。
实际案例:数据处理工作流[编辑 | 编辑源代码]
假设我们需要定义一个数据处理工作流,包含以下步骤:
1. 从API提取数据(extract_data
)。
2. 清洗数据(clean_data
)。
3. 将数据存储到数据库(load_data
)。
4. 发送通知(send_notification
)。
以下是完整的DAG定义:
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def extract_data():
print("Extracting data from API...")
def clean_data():
print("Cleaning data...")
def load_data():
print("Loading data to database...")
def send_notification():
print("Sending notification...")
with DAG(
dag_id="data_pipeline",
start_date=datetime(2023, 1, 1),
schedule_interval="@daily",
) as dag:
extract = PythonOperator(
task_id="extract_data",
python_callable=extract_data,
)
clean = PythonOperator(
task_id="clean_data",
python_callable=clean_data,
)
load = PythonOperator(
task_id="load_data",
python_callable=load_data,
)
notify = PythonOperator(
task_id="send_notification",
python_callable=send_notification,
)
# 定义依赖关系
extract >> clean >> load >> notify
执行顺序[编辑 | 编辑源代码]
1. extract_data
→ clean_data
→ load_data
→ send_notification
。
2. 每个任务必须成功完成后,下一个任务才会执行。
高级功能:动态任务生成[编辑 | 编辑源代码]
Airflow支持动态生成任务,适用于需要根据输入参数创建多个相似任务的场景。例如,处理多个文件:
files = ["file1.csv", "file2.csv", "file3.csv"]
with DAG(dag_id="dynamic_tasks", start_date=datetime(2023, 1, 1)) as dag:
for file in files:
process_task = PythonOperator(
task_id=f"process_{file}",
python_callable=lambda: print(f"Processing {file}"),
)
输出[编辑 | 编辑源代码]
- 生成3个任务:
process_file1.csv
、process_file2.csv
、process_file3.csv
。 - 每个任务独立执行,无依赖关系。
总结[编辑 | 编辑源代码]
- Airflow工作流通过DAG定义,包含任务(Operators)和依赖关系。
- 依赖关系决定了任务的执行顺序,支持串行、并行和复杂拓扑结构。
- 实际应用中,工作流可用于ETL、机器学习管道、定时任务等场景。
- 动态任务生成功能提高了工作流的灵活性。