Airflow调度策略
外观
Airflow调度策略[编辑 | 编辑源代码]
介绍[编辑 | 编辑源代码]
Airflow调度策略是Apache Airflow中用于控制任务执行时间和频率的核心机制。它决定了DAG(有向无环图)何时被触发、任务如何排队以及依赖关系如何影响执行顺序。理解调度策略对设计可靠的数据管道至关重要,尤其是在需要定时或事件驱动的场景中。
Airflow的调度器(Scheduler)负责解析DAG文件、检查任务依赖关系,并根据配置的策略将任务推送到执行队列。主要调度策略包括:
- 基于时间的调度(如Cron表达式或时间间隔)
- 外部触发调度(如手动触发或API调用)
- 依赖驱动的调度(任务基于上游状态执行)
核心调度机制[编辑 | 编辑源代码]
1. 基于时间的调度[编辑 | 编辑源代码]
Airflow默认使用类似Cron的语法定义调度间隔,通过DAG的schedule_interval
参数配置。支持以下格式:
- Cron表达式:
* * * * *
(分 时 日 月 周) - 预设值:
@daily
、@hourly
timedelta
对象:如timedelta(days=1)
代码示例[编辑 | 编辑源代码]
from datetime import datetime, timedelta
from airflow import DAG
# 使用Cron表达式
dag_cron = DAG(
dag_id='cron_example',
schedule_interval='0 0 * * *', # 每天午夜执行
start_date=datetime(2023, 1, 1)
)
# 使用timedelta
dag_delta = DAG(
dag_id='delta_example',
schedule_interval=timedelta(hours=2), # 每2小时执行
start_date=datetime(2023, 1, 1)
)
2. 外部触发调度[编辑 | 编辑源代码]
通过Airflow CLI或Web UI手动触发DAG运行,或通过REST API调用:
# CLI触发示例
airflow dags trigger my_dag_id
3. 依赖驱动的调度[编辑 | 编辑源代码]
任务通过set_upstream
或>>
运算符定义依赖关系:
task1 >> task2 # task2仅在task1成功后执行
调度时间详解[编辑 | 编辑源代码]
Airflow的调度逻辑遵循以下规则:
- start_date:调度的锚点时间
- execution_date:任务实际执行的逻辑时间(注意:对于间隔任务,execution_date是间隔开始的时间戳)
- 调度器在时触发运行
高级策略[编辑 | 编辑源代码]
回填(Backfilling)[编辑 | 编辑源代码]
通过catchup
参数控制是否补跑历史任务:
DAG(
dag_id='backfill_example',
catchup=False, # 禁用回填
schedule_interval='@daily'
)
最新运行策略[编辑 | 编辑源代码]
使用LatestOnlyOperator
跳过非最新调度周期的任务:
from airflow.operators.latest_only import LatestOnlyOperator
latest_only = LatestOnlyOperator(task_id='latest_only')
downstream_task = DummyOperator(task_id='downstream')
latest_only >> downstream_task
实际案例[编辑 | 编辑源代码]
场景:电商数据日报
- 需求:每天凌晨处理前一天的订单数据
- 解决方案:
dag = DAG(
dag_id='ecommerce_daily',
schedule_interval='0 2 * * *', # 每天凌晨2点
start_date=datetime(2023, 1, 1),
catchup=False
)
extract = PythonOperator(task_id='extract', python_callable=extract_orders)
transform = PythonOperator(task_id='transform', python_callable=clean_data)
load = PythonOperator(task_id='load', python_callable=upload_to_warehouse)
extract >> transform >> load
常见问题[编辑 | 编辑源代码]
- Q:为什么我的任务没有按预期时间触发?
A:检查start_date
时区是否与调度器一致,并确认DAG文件无语法错误。
- Q:如何避免任务重叠执行?
A:设置max_active_runs=1
或在任务中使用wait_for_downstream=True
。
最佳实践[编辑 | 编辑源代码]
1. 始终明确设置start_date
2. 生产环境建议禁用catchup
3. 使用execution_timeout
防止任务卡住
4. 复杂调度逻辑可拆分为多个DAG