跳转到内容

Airflow调度策略

来自代码酷

Airflow调度策略[编辑 | 编辑源代码]

介绍[编辑 | 编辑源代码]

Airflow调度策略是Apache Airflow中用于控制任务执行时间和频率的核心机制。它决定了DAG(有向无环图)何时被触发、任务如何排队以及依赖关系如何影响执行顺序。理解调度策略对设计可靠的数据管道至关重要,尤其是在需要定时或事件驱动的场景中。

Airflow的调度器(Scheduler)负责解析DAG文件、检查任务依赖关系,并根据配置的策略将任务推送到执行队列。主要调度策略包括:

  • 基于时间的调度(如Cron表达式或时间间隔)
  • 外部触发调度(如手动触发或API调用)
  • 依赖驱动的调度(任务基于上游状态执行)

核心调度机制[编辑 | 编辑源代码]

1. 基于时间的调度[编辑 | 编辑源代码]

Airflow默认使用类似Cron的语法定义调度间隔,通过DAG的schedule_interval参数配置。支持以下格式:

  • Cron表达式:* * * * *(分 时 日 月 周)
  • 预设值:@daily@hourly
  • timedelta对象:如timedelta(days=1)

代码示例[编辑 | 编辑源代码]

from datetime import datetime, timedelta
from airflow import DAG

# 使用Cron表达式
dag_cron = DAG(
    dag_id='cron_example',
    schedule_interval='0 0 * * *',  # 每天午夜执行
    start_date=datetime(2023, 1, 1)
)

# 使用timedelta
dag_delta = DAG(
    dag_id='delta_example',
    schedule_interval=timedelta(hours=2),  # 每2小时执行
    start_date=datetime(2023, 1, 1)
)

2. 外部触发调度[编辑 | 编辑源代码]

通过Airflow CLI或Web UI手动触发DAG运行,或通过REST API调用:

# CLI触发示例
airflow dags trigger my_dag_id

3. 依赖驱动的调度[编辑 | 编辑源代码]

任务通过set_upstream>>运算符定义依赖关系:

task1 >> task2  # task2仅在task1成功后执行

调度时间详解[编辑 | 编辑源代码]

Airflow的调度逻辑遵循以下规则:

  • start_date:调度的锚点时间
  • execution_date:任务实际执行的逻辑时间(注意:对于间隔任务,execution_date是间隔开始的时间戳)
  • 调度器在start_date+n×schedule_interval时触发运行

gantt title Airflow调度时间轴示例 dateFormat YYYY-MM-DD section DAG 每日任务 :a1, 2023-01-01, 1d a1 :a2, after a1, 1d a2 :a3, after a2, 1d

高级策略[编辑 | 编辑源代码]

回填(Backfilling)[编辑 | 编辑源代码]

通过catchup参数控制是否补跑历史任务:

DAG(
    dag_id='backfill_example',
    catchup=False,  # 禁用回填
    schedule_interval='@daily'
)

最新运行策略[编辑 | 编辑源代码]

使用LatestOnlyOperator跳过非最新调度周期的任务:

from airflow.operators.latest_only import LatestOnlyOperator

latest_only = LatestOnlyOperator(task_id='latest_only')
downstream_task = DummyOperator(task_id='downstream')
latest_only >> downstream_task

实际案例[编辑 | 编辑源代码]

场景:电商数据日报

  • 需求:每天凌晨处理前一天的订单数据
  • 解决方案:
dag = DAG(
    dag_id='ecommerce_daily',
    schedule_interval='0 2 * * *',  # 每天凌晨2点
    start_date=datetime(2023, 1, 1),
    catchup=False
)

extract = PythonOperator(task_id='extract', python_callable=extract_orders)
transform = PythonOperator(task_id='transform', python_callable=clean_data)
load = PythonOperator(task_id='load', python_callable=upload_to_warehouse)

extract >> transform >> load

常见问题[编辑 | 编辑源代码]

  • Q:为什么我的任务没有按预期时间触发?
 A:检查start_date时区是否与调度器一致,并确认DAG文件无语法错误。
  • Q:如何避免任务重叠执行?
 A:设置max_active_runs=1或在任务中使用wait_for_downstream=True

最佳实践[编辑 | 编辑源代码]

1. 始终明确设置start_date 2. 生产环境建议禁用catchup 3. 使用execution_timeout防止任务卡住 4. 复杂调度逻辑可拆分为多个DAG