跳转到内容

Airflow DAG定义语法

来自代码酷

Airflow DAG定义语法[编辑 | 编辑源代码]

介绍[编辑 | 编辑源代码]

DAG(Directed Acyclic Graph,有向无环图)是Apache Airflow的核心概念,用于定义任务及其依赖关系。DAG定义语法是编写Airflow工作流的基础,它通过Python代码描述任务的执行顺序、调度规则和依赖关系。本章节将详细介绍DAG的定义语法,包括基本结构、参数配置和实际应用。

DAG的基本结构[编辑 | 编辑源代码]

一个典型的DAG定义包含以下部分: 1. 导入模块:引入必要的Airflow类和函数。 2. DAG对象初始化:定义DAG的名称、描述、调度间隔等元数据。 3. 任务定义:使用Operator(如`PythonOperator`、`BashOperator`)创建具体任务。 4. 依赖关系:通过`>>`或`set_downstream`/`set_upstream`方法指定任务执行顺序。

以下是一个简单的DAG示例:

  
from datetime import datetime, timedelta  
from airflow import DAG  
from airflow.operators.bash import BashOperator  

# 初始化DAG对象  
dag = DAG(  
    dag_id="example_dag",  
    description="A simple tutorial DAG",  
    schedule_interval=timedelta(days=1),  
    start_date=datetime(2023, 1, 1),  
    catchup=False,  
)  

# 定义任务  
task1 = BashOperator(  
    task_id="print_date",  
    bash_command="date",  
    dag=dag,  
)  

task2 = BashOperator(  
    task_id="sleep",  
    bash_command="sleep 5",  
    dag=dag,  
)  

# 设置依赖关系  
task1 >> task2

代码解释[编辑 | 编辑源代码]

- `dag_id`:DAG的唯一标识符。 - `schedule_interval`:调度频率(如`timedelta(days=1)`表示每天运行一次)。 - `start_date`:DAG首次运行的日期。 - `catchup`:是否补跑过去未执行的任务(避免历史任务堆积)。 - `>>` 运算符:表示`task1`完成后执行`task2`。

DAG参数详解[编辑 | 编辑源代码]

以下是DAG初始化时的常用参数:

DAG参数列表
参数名 类型 描述
str | DAG的唯一名称(必需)
str | DAG的简短描述
str/timedelta | 调度间隔(如`"@daily"`或`timedelta(hours=2)`)
datetime | 首次运行的起始时间
datetime | DAG的结束时间(可选)
bool | 是否开启追赶模式
dict | 传递给所有任务的默认参数

默认参数示例[编辑 | 编辑源代码]

通过`default_args`统一配置任务参数:

  
default_args = {  
    "owner": "airflow",  
    "retries": 3,  
    "retry_delay": timedelta(minutes=5),  
}  

dag = DAG(  
    dag_id="example_with_default_args",  
    default_args=default_args,  
    schedule_interval="@daily",  
)

任务依赖关系[编辑 | 编辑源代码]

Airflow支持多种方式定义任务依赖:

1. 使用 `>>` 运算符[编辑 | 编辑源代码]

  
task1 >> task2 >> task3  # task1 → task2 → task3

2. 使用 `set_downstream`/`set_upstream`[编辑 | 编辑源代码]

  
task1.set_downstream(task2)  # 等效于 task1 >> task2  
task2.set_upstream(task1)    # 等效于 task1 >> task2

3. 复杂依赖关系[编辑 | 编辑源代码]

使用列表定义并行或分支依赖:

  
[task1, task2] >> task3  # task1和task2并行执行,完成后触发task3

task1
task3
task2

实际案例:数据处理DAG[编辑 | 编辑源代码]

以下是一个真实场景的DAG,模拟从API提取数据、清洗数据并存储到数据库的流程:

  
from airflow.operators.python import PythonOperator  

def fetch_data():  
    print("Fetching data from API...")  

def clean_data():  
    print("Cleaning data...")  

def load_to_db():  
    print("Loading to PostgreSQL...")  

with DAG(  
    dag_id="data_pipeline",  
    schedule_interval="@hourly",  
    start_date=datetime(2023, 1, 1),  
) as dag:  

    fetch = PythonOperator(  
        task_id="fetch_data",  
        python_callable=fetch_data,  
    )  

    clean = PythonOperator(  
        task_id="clean_data",  
        python_callable=clean_data,  
    )  

    load = PythonOperator(  
        task_id="load_to_db",  
        python_callable=load_to_db,  
    )  

    fetch >> clean >> load

常见问题[编辑 | 编辑源代码]

Q: 如何避免DAG因代码错误无法加载? - 使用`airflow dags list`检查DAG是否成功加载。 - 通过`airflow tasks list example_dag`验证任务定义。

Q: `start_date`为什么影响调度? - Airflow根据`start_date + schedule_interval`计算首次运行时间。例如,`start_date=datetime(2023, 1, 1)`和`schedule_interval="@daily"`会在2023-01-02 00:00首次运行。

总结[编辑 | 编辑源代码]

DAG定义语法是Airflow工作流开发的核心,需掌握: 1. DAG对象的基本参数(如`dag_id`、`schedule_interval`)。 2. 任务依赖关系的多种定义方式。 3. 通过`default_args`统一管理任务参数。 4. 结合实际场景设计DAG结构。