跳转到内容

Airflow DAG基础

来自代码酷

Airflow DAG基础[编辑 | 编辑源代码]

介绍[编辑 | 编辑源代码]

DAG(Directed Acyclic Graph,有向无环图)是Apache Airflow的核心概念,用于定义任务的工作流。DAG由一组任务(Tasks)及其依赖关系组成,确保任务按特定顺序执行且无循环依赖。DAG的本质是描述“做什么”(任务)和“何时做”(依赖关系),而非“如何做”(具体逻辑由任务实现)。

DAG的特性[编辑 | 编辑源代码]

1. 有向性:任务间的依赖关系是单向的(如A→B表示B依赖A)。 2. 无环性:依赖关系不能形成循环(如A→B→C→A是无效的)。 3. 动态性:DAG通过Python代码动态生成,支持灵活配置。

基本结构[编辑 | 编辑源代码]

一个DAG通常包含以下部分:

  • DAG定义:设置调度间隔、开始时间等元数据。
  • 任务(Operators):执行具体操作(如Bash命令、Python函数)。
  • 依赖关系:通过`>>`或`set_downstream`方法定义。

代码示例[编辑 | 编辑源代码]

  
from airflow import DAG  
from airflow.operators.bash import BashOperator  
from datetime import datetime  

# 定义DAG  
dag = DAG(  
    dag_id="example_dag",  
    start_date=datetime(2023, 1, 1),  
    schedule_interval="@daily",  
)  

# 定义任务  
task1 = BashOperator(  
    task_id="print_date",  
    bash_command="date",  
    dag=dag,  
)  

task2 = BashOperator(  
    task_id="echo_hello",  
    bash_command="echo 'Hello, Airflow!'",  
    dag=dag,  
)  

# 设置依赖关系  
task1 >> task2  # 等价于 task1.set_downstream(task2)

输出示例(假设DAG在2023-01-01执行):

  
[2023-01-01 00:00:00] Running task: print_date  
[2023-01-01 00:00:01] Output: Mon Jan 1 00:00:01 UTC 2023  
[2023-01-01 00:00:02] Running task: echo_hello  
[2023-01-01 00:00:02] Output: Hello, Airflow!  

依赖关系可视化[编辑 | 编辑源代码]

graph LR A[print_date] --> B[echo_hello]

实际应用案例[编辑 | 编辑源代码]

场景:每天凌晨处理日志文件并发送报告。 1. 任务1:下载日志文件(`BashOperator`调用`wget`)。 2. 任务2:解析日志(`PythonOperator`调用自定义函数)。 3. 任务3:发送邮件报告(`EmailOperator`)。

  
process_logs = BashOperator(task_id="download_logs", bash_command="wget example.com/logs.txt", dag=dag)  
analyze_data = PythonOperator(task_id="analyze_logs", python_callable=parse_logs, dag=dag)  
send_report = EmailOperator(task_id="send_email", to="admin@example.com", subject="Log Report", dag=dag)  

process_logs >> analyze_data >> send_report

常见问题[编辑 | 编辑源代码]

Q: DAG为何不能有循环依赖? A: Airflow依赖拓扑排序执行任务,循环会导致无限循环(如A→B→A)。

Q: `schedule_interval`支持哪些格式? A: 常见选项:

  • `@daily`/`@hourly`:预定义间隔。
  • `timedelta(days=1)`:Python时间差。
  • Cron表达式(如`0 0 * * *`)。

数学表达[编辑 | 编辑源代码]

DAG可表示为有序对G=(V,E),其中:

  • V是任务节点的集合。
  • E是边(依赖关系)的集合,且满足(u,v)E,uv

进阶提示[编辑 | 编辑源代码]

  • 使用`airflow.models.Variable`动态配置DAG参数。
  • 通过`default_args`为DAG中的所有任务设置默认参数(如重试次数)。

总结[编辑 | 编辑源代码]

DAG是Airflow工作流的蓝图,通过Python代码定义任务及其依赖关系。理解DAG的结构和特性是掌握Airflow的关键第一步。