Airflow DAG文件结构
外观
Airflow DAG文件结构[编辑 | 编辑源代码]
介绍[编辑 | 编辑源代码]
Airflow DAG(Directed Acyclic Graph)文件是定义工作流的核心组件,它以Python脚本的形式描述任务的依赖关系、调度逻辑和执行顺序。DAG文件的结构决定了工作流的可读性、可维护性和功能性。本指南将详细解析DAG文件的标准结构,帮助初学者和高级用户掌握其设计原则。
核心组成部分[编辑 | 编辑源代码]
一个典型的DAG文件包含以下部分:
1. 导入依赖[编辑 | 编辑源代码]
必须导入Airflow核心模块和所需的操作器(Operators)。
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
2. DAG定义[编辑 | 编辑源代码]
通过实例化DAG
类定义工作流的基本属性:
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': True,
'retries': 3,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
dag_id='example_dag',
default_args=default_args,
description='A simple tutorial DAG',
schedule_interval=timedelta(days=1),
start_date=datetime(2023, 1, 1),
catchup=False,
tags=['example'],
)
- 关键参数说明:
*dag_id
: 唯一标识符 *schedule_interval
: 调度频率(支持timedelta
或CRON表达式) *catchup
: 是否执行历史任务
3. 任务定义[编辑 | 编辑源代码]
使用Operator定义具体任务,并关联到DAG实例:
def print_hello():
print("Hello, Airflow!")
task1 = BashOperator(
task_id='bash_task',
bash_command='echo "Executing Bash command"',
dag=dag,
)
task2 = PythonOperator(
task_id='python_task',
python_callable=print_hello,
dag=dag,
)
4. 依赖关系[编辑 | 编辑源代码]
通过>>
或<<
运算符定义任务顺序:
task1 >> task2 # task1执行成功后触发task2
完整示例[编辑 | 编辑源代码]
结合上述部分的完整DAG文件:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
def print_hello():
print("Hello, Airflow!")
default_args = {
'owner': 'airflow',
'retries': 3,
'retry_delay': timedelta(minutes=5),
}
with DAG(
dag_id='complete_example',
default_args=default_args,
schedule_interval='0 0 * * *',
start_date=datetime(2023, 1, 1),
) as dag:
start = BashOperator(
task_id='start',
bash_command='echo "Starting the workflow"',
)
process = PythonOperator(
task_id='process_data',
python_callable=print_hello,
)
end = BashOperator(
task_id='end',
bash_command='echo "Workflow completed"',
)
start >> process >> end
结构可视化[编辑 | 编辑源代码]
使用Mermaid展示任务依赖关系:
高级技巧[编辑 | 编辑源代码]
动态生成DAG[编辑 | 编辑源代码]
通过Python代码动态创建DAG(例如基于配置文件):
for i in range(3):
task = BashOperator(
task_id=f'dynamic_task_{i}',
bash_command=f'echo "Task {i}"',
dag=dag,
)
Jinja模板[编辑 | 编辑源代码]
在参数中使用变量(如模板:Ds
表示执行日期):
BashOperator(
task_id='templated_task',
bash_command='echo "Execution date: {{ ds }}"',
dag=dag,
)
实际应用场景[编辑 | 编辑源代码]
电商数据处理流程:
1. 每日凌晨拉取订单数据(BashOperator
调用API)
2. 清洗数据(PythonOperator
调用Pandas脚本)
3. 生成报表并发送邮件(EmailOperator
)
对应的DAG依赖:
最佳实践[编辑 | 编辑源代码]
- 保持DAG文件简洁,复杂逻辑封装到单独模块
- 使用
default_args
集中管理通用参数 - 为每个
task_id
使用描述性名称 - 避免在DAG文件中直接写入业务逻辑
常见问题[编辑 | 编辑源代码]
Q: 为什么我的DAG没有出现在Web UI?
A: 检查:
1. 文件是否放在dags/
目录
2. 是否有语法错误(使用python your_dag.py
测试)
3. start_date
是否在未来
Q: 如何调试任务失败? A: 查看日志中的错误堆栈,或本地测试Python函数逻辑。