跳转到内容
主菜单
主菜单
移至侧栏
隐藏
导航
首页
最近更改
随机页面
MediaWiki帮助
代码酷
搜索
搜索
中文(中国大陆)
外观
创建账号
登录
个人工具
创建账号
登录
未登录编辑者的页面
了解详情
贡献
讨论
编辑“︁
Airflow调度策略
”︁(章节)
页面
讨论
大陆简体
阅读
编辑
编辑源代码
查看历史
工具
工具
移至侧栏
隐藏
操作
阅读
编辑
编辑源代码
查看历史
常规
链入页面
相关更改
特殊页面
页面信息
外观
移至侧栏
隐藏
您的更改会在有权核准的用户核准后向读者展示。
警告:
您没有登录。如果您进行任何编辑,您的IP地址会公开展示。如果您
登录
或
创建账号
,您的编辑会以您的用户名署名,此外还有其他益处。
反垃圾检查。
不要
加入这个!
= Airflow调度策略 = == 介绍 == '''Airflow调度策略'''是Apache Airflow中用于控制任务执行时间和频率的核心机制。它决定了DAG(有向无环图)何时被触发、任务如何排队以及依赖关系如何影响执行顺序。理解调度策略对设计可靠的数据管道至关重要,尤其是在需要定时或事件驱动的场景中。 Airflow的调度器(Scheduler)负责解析DAG文件、检查任务依赖关系,并根据配置的策略将任务推送到执行队列。主要调度策略包括: * '''基于时间的调度'''(如Cron表达式或时间间隔) * '''外部触发调度'''(如手动触发或API调用) * '''依赖驱动的调度'''(任务基于上游状态执行) == 核心调度机制 == === 1. 基于时间的调度 === Airflow默认使用类似Cron的语法定义调度间隔,通过DAG的<code>schedule_interval</code>参数配置。支持以下格式: * Cron表达式:<code>* * * * *</code>(分 时 日 月 周) * 预设值:<code>@daily</code>、<code>@hourly</code> * <code>timedelta</code>对象:如<code>timedelta(days=1)</code> ==== 代码示例 ==== <syntaxhighlight lang="python"> from datetime import datetime, timedelta from airflow import DAG # 使用Cron表达式 dag_cron = DAG( dag_id='cron_example', schedule_interval='0 0 * * *', # 每天午夜执行 start_date=datetime(2023, 1, 1) ) # 使用timedelta dag_delta = DAG( dag_id='delta_example', schedule_interval=timedelta(hours=2), # 每2小时执行 start_date=datetime(2023, 1, 1) ) </syntaxhighlight> === 2. 外部触发调度 === 通过Airflow CLI或Web UI手动触发DAG运行,或通过REST API调用: <syntaxhighlight lang="bash"> # CLI触发示例 airflow dags trigger my_dag_id </syntaxhighlight> === 3. 依赖驱动的调度 === 任务通过<code>set_upstream</code>或<code>>></code>运算符定义依赖关系: <syntaxhighlight lang="python"> task1 >> task2 # task2仅在task1成功后执行 </syntaxhighlight> == 调度时间详解 == Airflow的调度逻辑遵循以下规则: * '''start_date''':调度的锚点时间 * '''execution_date''':任务实际执行的逻辑时间(注意:对于间隔任务,execution_date是间隔开始的时间戳) * 调度器在<math>start\_date + n \times schedule\_interval</math>时触发运行 <mermaid> gantt title Airflow调度时间轴示例 dateFormat YYYY-MM-DD section DAG 每日任务 :a1, 2023-01-01, 1d a1 :a2, after a1, 1d a2 :a3, after a2, 1d </mermaid> == 高级策略 == === 回填(Backfilling) === 通过<code>catchup</code>参数控制是否补跑历史任务: <syntaxhighlight lang="python"> DAG( dag_id='backfill_example', catchup=False, # 禁用回填 schedule_interval='@daily' ) </syntaxhighlight> === 最新运行策略 === 使用<code>LatestOnlyOperator</code>跳过非最新调度周期的任务: <syntaxhighlight lang="python"> from airflow.operators.latest_only import LatestOnlyOperator latest_only = LatestOnlyOperator(task_id='latest_only') downstream_task = DummyOperator(task_id='downstream') latest_only >> downstream_task </syntaxhighlight> == 实际案例 == '''场景:电商数据日报''' * 需求:每天凌晨处理前一天的订单数据 * 解决方案: <syntaxhighlight lang="python"> dag = DAG( dag_id='ecommerce_daily', schedule_interval='0 2 * * *', # 每天凌晨2点 start_date=datetime(2023, 1, 1), catchup=False ) extract = PythonOperator(task_id='extract', python_callable=extract_orders) transform = PythonOperator(task_id='transform', python_callable=clean_data) load = PythonOperator(task_id='load', python_callable=upload_to_warehouse) extract >> transform >> load </syntaxhighlight> == 常见问题 == * '''Q:为什么我的任务没有按预期时间触发?''' A:检查<code>start_date</code>时区是否与调度器一致,并确认DAG文件无语法错误。 * '''Q:如何避免任务重叠执行?''' A:设置<code>max_active_runs=1</code>或在任务中使用<code>wait_for_downstream=True</code>。 == 最佳实践 == 1. 始终明确设置<code>start_date</code> 2. 生产环境建议禁用<code>catchup</code> 3. 使用<code>execution_timeout</code>防止任务卡住 4. 复杂调度逻辑可拆分为多个DAG [[Category:大数据框架]] [[Category:Airflow]] [[Category:Airflow调度与触发]]
摘要:
请注意,所有对代码酷的贡献均被视为依照知识共享署名-非商业性使用-相同方式共享发表(详情请见
代码酷:著作权
)。如果您不希望您的文字作品被随意编辑和分发传播,请不要在此提交。
您同时也向我们承诺,您提交的内容为您自己所创作,或是复制自公共领域或类似自由来源。
未经许可,请勿提交受著作权保护的作品!
取消
编辑帮助
(在新窗口中打开)