Airflow DAG定义语法
Airflow DAG定义语法[编辑 | 编辑源代码]
介绍[编辑 | 编辑源代码]
DAG(Directed Acyclic Graph,有向无环图)是Apache Airflow的核心概念,用于定义任务及其依赖关系。DAG定义语法是编写Airflow工作流的基础,它通过Python代码描述任务的执行顺序、调度规则和依赖关系。本章节将详细介绍DAG的定义语法,包括基本结构、参数配置和实际应用。
DAG的基本结构[编辑 | 编辑源代码]
一个典型的DAG定义包含以下部分: 1. 导入模块:引入必要的Airflow类和函数。 2. DAG对象初始化:定义DAG的名称、描述、调度间隔等元数据。 3. 任务定义:使用Operator(如`PythonOperator`、`BashOperator`)创建具体任务。 4. 依赖关系:通过`>>`或`set_downstream`/`set_upstream`方法指定任务执行顺序。
以下是一个简单的DAG示例:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
# 初始化DAG对象
dag = DAG(
dag_id="example_dag",
description="A simple tutorial DAG",
schedule_interval=timedelta(days=1),
start_date=datetime(2023, 1, 1),
catchup=False,
)
# 定义任务
task1 = BashOperator(
task_id="print_date",
bash_command="date",
dag=dag,
)
task2 = BashOperator(
task_id="sleep",
bash_command="sleep 5",
dag=dag,
)
# 设置依赖关系
task1 >> task2
代码解释[编辑 | 编辑源代码]
- `dag_id`:DAG的唯一标识符。 - `schedule_interval`:调度频率(如`timedelta(days=1)`表示每天运行一次)。 - `start_date`:DAG首次运行的日期。 - `catchup`:是否补跑过去未执行的任务(避免历史任务堆积)。 - `>>` 运算符:表示`task1`完成后执行`task2`。
DAG参数详解[编辑 | 编辑源代码]
以下是DAG初始化时的常用参数:
参数名 | 类型 | 描述 |
---|---|---|
str | DAG的唯一名称(必需) | ||
str | DAG的简短描述 | ||
str/timedelta | 调度间隔(如`"@daily"`或`timedelta(hours=2)`) | ||
datetime | 首次运行的起始时间 | ||
datetime | DAG的结束时间(可选) | ||
bool | 是否开启追赶模式 | ||
dict | 传递给所有任务的默认参数 |
默认参数示例[编辑 | 编辑源代码]
通过`default_args`统一配置任务参数:
default_args = {
"owner": "airflow",
"retries": 3,
"retry_delay": timedelta(minutes=5),
}
dag = DAG(
dag_id="example_with_default_args",
default_args=default_args,
schedule_interval="@daily",
)
任务依赖关系[编辑 | 编辑源代码]
Airflow支持多种方式定义任务依赖:
1. 使用 `>>` 运算符[编辑 | 编辑源代码]
task1 >> task2 >> task3 # task1 → task2 → task3
2. 使用 `set_downstream`/`set_upstream`[编辑 | 编辑源代码]
task1.set_downstream(task2) # 等效于 task1 >> task2
task2.set_upstream(task1) # 等效于 task1 >> task2
3. 复杂依赖关系[编辑 | 编辑源代码]
使用列表定义并行或分支依赖:
[task1, task2] >> task3 # task1和task2并行执行,完成后触发task3
实际案例:数据处理DAG[编辑 | 编辑源代码]
以下是一个真实场景的DAG,模拟从API提取数据、清洗数据并存储到数据库的流程:
from airflow.operators.python import PythonOperator
def fetch_data():
print("Fetching data from API...")
def clean_data():
print("Cleaning data...")
def load_to_db():
print("Loading to PostgreSQL...")
with DAG(
dag_id="data_pipeline",
schedule_interval="@hourly",
start_date=datetime(2023, 1, 1),
) as dag:
fetch = PythonOperator(
task_id="fetch_data",
python_callable=fetch_data,
)
clean = PythonOperator(
task_id="clean_data",
python_callable=clean_data,
)
load = PythonOperator(
task_id="load_to_db",
python_callable=load_to_db,
)
fetch >> clean >> load
常见问题[编辑 | 编辑源代码]
Q: 如何避免DAG因代码错误无法加载? - 使用`airflow dags list`检查DAG是否成功加载。 - 通过`airflow tasks list example_dag`验证任务定义。
Q: `start_date`为什么影响调度? - Airflow根据`start_date + schedule_interval`计算首次运行时间。例如,`start_date=datetime(2023, 1, 1)`和`schedule_interval="@daily"`会在2023-01-02 00:00首次运行。
总结[编辑 | 编辑源代码]
DAG定义语法是Airflow工作流开发的核心,需掌握: 1. DAG对象的基本参数(如`dag_id`、`schedule_interval`)。 2. 任务依赖关系的多种定义方式。 3. 通过`default_args`统一管理任务参数。 4. 结合实际场景设计DAG结构。