跳转到内容

Airflow DAG文件结构

来自代码酷

Airflow DAG文件结构[编辑 | 编辑源代码]

介绍[编辑 | 编辑源代码]

Airflow DAG(Directed Acyclic Graph)文件是定义工作流的核心组件,它以Python脚本的形式描述任务的依赖关系、调度逻辑和执行顺序。DAG文件的结构决定了工作流的可读性、可维护性和功能性。本指南将详细解析DAG文件的标准结构,帮助初学者和高级用户掌握其设计原则。

核心组成部分[编辑 | 编辑源代码]

一个典型的DAG文件包含以下部分:

1. 导入依赖[编辑 | 编辑源代码]

必须导入Airflow核心模块和所需的操作器(Operators)。

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator

2. DAG定义[编辑 | 编辑源代码]

通过实例化DAG类定义工作流的基本属性:

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': True,
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    dag_id='example_dag',
    default_args=default_args,
    description='A simple tutorial DAG',
    schedule_interval=timedelta(days=1),
    start_date=datetime(2023, 1, 1),
    catchup=False,
    tags=['example'],
)
  • 关键参数说明:
 * dag_id: 唯一标识符  
 * schedule_interval: 调度频率(支持timedelta或CRON表达式)  
 * catchup: 是否执行历史任务  

3. 任务定义[编辑 | 编辑源代码]

使用Operator定义具体任务,并关联到DAG实例:

def print_hello():
    print("Hello, Airflow!")

task1 = BashOperator(
    task_id='bash_task',
    bash_command='echo "Executing Bash command"',
    dag=dag,
)

task2 = PythonOperator(
    task_id='python_task',
    python_callable=print_hello,
    dag=dag,
)

4. 依赖关系[编辑 | 编辑源代码]

通过>><<运算符定义任务顺序:

task1 >> task2  # task1执行成功后触发task2

完整示例[编辑 | 编辑源代码]

结合上述部分的完整DAG文件:

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator

def print_hello():
    print("Hello, Airflow!")

default_args = {
    'owner': 'airflow',
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    dag_id='complete_example',
    default_args=default_args,
    schedule_interval='0 0 * * *',
    start_date=datetime(2023, 1, 1),
) as dag:

    start = BashOperator(
        task_id='start',
        bash_command='echo "Starting the workflow"',
    )

    process = PythonOperator(
        task_id='process_data',
        python_callable=print_hello,
    )

    end = BashOperator(
        task_id='end',
        bash_command='echo "Workflow completed"',
    )

    start >> process >> end

结构可视化[编辑 | 编辑源代码]

使用Mermaid展示任务依赖关系:

graph LR A[start] --> B[process_data] B --> C[end]

高级技巧[编辑 | 编辑源代码]

动态生成DAG[编辑 | 编辑源代码]

通过Python代码动态创建DAG(例如基于配置文件):

for i in range(3):
    task = BashOperator(
        task_id=f'dynamic_task_{i}',
        bash_command=f'echo "Task {i}"',
        dag=dag,
    )

Jinja模板[编辑 | 编辑源代码]

在参数中使用变量(如模板:Ds表示执行日期):

BashOperator(
    task_id='templated_task',
    bash_command='echo "Execution date: {{ ds }}"',
    dag=dag,
)

实际应用场景[编辑 | 编辑源代码]

电商数据处理流程: 1. 每日凌晨拉取订单数据(BashOperator调用API) 2. 清洗数据(PythonOperator调用Pandas脚本) 3. 生成报表并发送邮件(EmailOperator

对应的DAG依赖:

graph TD A[Fetch_Orders] --> B[Clean_Data] B --> C[Generate_Report]

最佳实践[编辑 | 编辑源代码]

  • 保持DAG文件简洁,复杂逻辑封装到单独模块
  • 使用default_args集中管理通用参数
  • 为每个task_id使用描述性名称
  • 避免在DAG文件中直接写入业务逻辑

常见问题[编辑 | 编辑源代码]

Q: 为什么我的DAG没有出现在Web UI? A: 检查: 1. 文件是否放在dags/目录 2. 是否有语法错误(使用python your_dag.py测试) 3. start_date是否在未来

Q: 如何调试任务失败? A: 查看日志中的错误堆栈,或本地测试Python函数逻辑。