跳转到内容

Airflow工作流定义

来自代码酷

Airflow工作流定义[编辑 | 编辑源代码]

Airflow工作流定义是Apache Airflow的核心概念之一,用于描述任务的执行顺序和依赖关系。工作流在Airflow中以有向无环图(DAG, Directed Acyclic Graph)的形式表示,其中每个节点代表一个任务(Task),边代表任务之间的依赖关系。本文将详细介绍如何定义Airflow工作流,包括DAG的结构、任务依赖关系以及实际应用示例。

什么是工作流定义?[编辑 | 编辑源代码]

在Airflow中,工作流定义是一个Python脚本,用于描述一组任务及其执行顺序。工作流由以下几个关键部分组成:

  • DAG(Directed Acyclic Graph):定义工作流的整体结构,包括调度时间、重试策略等。
  • Operators:定义具体的任务类型(如Python函数、Bash命令、SQL查询等)。
  • Tasks:Operator的实例,表示工作流中的一个具体执行单元。
  • Dependencies:定义任务之间的依赖关系,即执行顺序。

工作流定义的核心目标是确保任务按照正确的顺序执行,同时支持调度、监控和错误处理。

DAG的基本结构[编辑 | 编辑源代码]

以下是一个简单的DAG定义示例,展示如何创建一个包含两个任务的工作流:

from datetime import datetime
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

def print_hello():
    print("Hello from PythonOperator!")

# 定义DAG
with DAG(
    dag_id="example_dag",
    start_date=datetime(2023, 1, 1),
    schedule_interval="@daily",
) as dag:
    
    # 定义任务1:Bash命令
    task1 = BashOperator(
        task_id="print_date",
        bash_command="date",
    )

    # 定义任务2:Python函数
    task2 = PythonOperator(
        task_id="print_hello",
        python_callable=print_hello,
    )

    # 定义依赖关系:task1 -> task2
    task1 >> task2

代码解释[编辑 | 编辑源代码]

1. DAG初始化:通过DAG()构造函数定义DAG的基本属性,如dag_idstart_dateschedule_interval。 2. 任务定义

  * BashOperator执行Bash命令(如打印当前日期)。
  * PythonOperator调用Python函数(如打印"Hello")。

3. 依赖关系:通过>>符号表示task1必须在task2之前执行。

任务依赖关系的表示[编辑 | 编辑源代码]

Airflow支持多种方式定义任务依赖关系: 1. 使用>><<符号:

   task1 >> task2  # task1先执行,task2后执行
   task2 << task1  # 等价写法

2. 使用set_upstream()set_downstream()方法:

   task1.set_downstream(task2)  # task1 -> task2
   task2.set_upstream(task1)    # 等价写法

3. 复杂依赖关系(并行与串行结合):

   (task1 >> task2 >> task3)  # 串行执行
   (task1 >> [task2, task3])   # task1完成后,task2和task3并行执行

依赖关系图示例[编辑 | 编辑源代码]

以下是一个包含并行任务的DAG依赖关系图:

graph LR A[Task1] --> B[Task2] A --> C[Task3] B --> D[Task4] C --> D

解释:

  • Task1完成后,Task2和Task3并行执行。
  • Task4需要等待Task2和Task3都完成后才能执行。

实际案例:数据处理工作流[编辑 | 编辑源代码]

假设我们需要定义一个数据处理工作流,包含以下步骤: 1. 从API提取数据(extract_data)。 2. 清洗数据(clean_data)。 3. 将数据存储到数据库(load_data)。 4. 发送通知(send_notification)。

以下是完整的DAG定义:

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def extract_data():
    print("Extracting data from API...")

def clean_data():
    print("Cleaning data...")

def load_data():
    print("Loading data to database...")

def send_notification():
    print("Sending notification...")

with DAG(
    dag_id="data_pipeline",
    start_date=datetime(2023, 1, 1),
    schedule_interval="@daily",
) as dag:
    
    extract = PythonOperator(
        task_id="extract_data",
        python_callable=extract_data,
    )

    clean = PythonOperator(
        task_id="clean_data",
        python_callable=clean_data,
    )

    load = PythonOperator(
        task_id="load_data",
        python_callable=load_data,
    )

    notify = PythonOperator(
        task_id="send_notification",
        python_callable=send_notification,
    )

    # 定义依赖关系
    extract >> clean >> load >> notify

执行顺序[编辑 | 编辑源代码]

1. extract_dataclean_dataload_datasend_notification。 2. 每个任务必须成功完成后,下一个任务才会执行。

高级功能:动态任务生成[编辑 | 编辑源代码]

Airflow支持动态生成任务,适用于需要根据输入参数创建多个相似任务的场景。例如,处理多个文件:

files = ["file1.csv", "file2.csv", "file3.csv"]

with DAG(dag_id="dynamic_tasks", start_date=datetime(2023, 1, 1)) as dag:
    for file in files:
        process_task = PythonOperator(
            task_id=f"process_{file}",
            python_callable=lambda: print(f"Processing {file}"),
        )

输出[编辑 | 编辑源代码]

  • 生成3个任务:process_file1.csvprocess_file2.csvprocess_file3.csv
  • 每个任务独立执行,无依赖关系。

总结[编辑 | 编辑源代码]

  • Airflow工作流通过DAG定义,包含任务(Operators)和依赖关系。
  • 依赖关系决定了任务的执行顺序,支持串行、并行和复杂拓扑结构。
  • 实际应用中,工作流可用于ETL、机器学习管道、定时任务等场景。
  • 动态任务生成功能提高了工作流的灵活性。