跳转到内容
主菜单
主菜单
移至侧栏
隐藏
导航
首页
最近更改
随机页面
MediaWiki帮助
代码酷
搜索
搜索
中文(中国大陆)
外观
创建账号
登录
个人工具
创建账号
登录
未登录编辑者的页面
了解详情
贡献
讨论
编辑“︁
Airflow调度概念
”︁
页面
讨论
大陆简体
阅读
编辑
编辑源代码
查看历史
工具
工具
移至侧栏
隐藏
操作
阅读
编辑
编辑源代码
查看历史
常规
链入页面
相关更改
特殊页面
页面信息
外观
移至侧栏
隐藏
您的更改会在有权核准的用户核准后向读者展示。
警告:
您没有登录。如果您进行任何编辑,您的IP地址会公开展示。如果您
登录
或
创建账号
,您的编辑会以您的用户名署名,此外还有其他益处。
反垃圾检查。
不要
加入这个!
= Airflow调度概念 = '''Apache Airflow'''的调度系统是其核心功能之一,允许用户以编程方式定义、调度和监控工作流(DAG)。本节将详细介绍Airflow的调度机制,包括调度器的工作原理、时间参数配置以及实际应用案例。 == 调度器简介 == Airflow的调度器(Scheduler)是一个持续运行的守护进程,负责解析DAG文件、触发任务实例(Task Instances)并根据预设的时间表执行任务。其核心功能包括: * '''DAG解析''':定期扫描<code>dags_folder</code>以加载或更新DAG定义。 * '''任务触发''':根据<code>schedule_interval</code>和<code>start_date</code>计算任务执行时间。 * '''依赖检查''':确保任务仅在满足上游依赖后运行。 === 关键术语 === * '''DAG(有向无环图)''':定义任务及其依赖关系的工作流。 * '''Execution Date''':任务逻辑上的执行时间(注意与实际运行时间不同)。 * '''Schedule Interval''':定义DAG运行频率的间隔(如<code>@daily</code>、<code>0 * * * *</code>)。 == 调度时间配置 == Airflow使用[[cron]]语法和[[timedelta]]对象定义调度间隔。以下是一个DAG的调度配置示例: <syntaxhighlight lang="python"> from airflow import DAG from airflow.operators.bash import BashOperator from datetime import datetime, timedelta # 定义DAG,每天凌晨1点运行 dag = DAG( dag_id='example_scheduler_dag', schedule_interval='0 1 * * *', # cron语法 start_date=datetime(2023, 1, 1), catchup=False ) task = BashOperator( task_id='print_date', bash_command='date', dag=dag ) </syntaxhighlight> === 执行日期(Execution Date)详解 === Airflow的<code>execution_date</code>表示任务逻辑上的执行时间窗口。例如: * 对于<code>schedule_interval='@daily'</code>的DAG,2023-01-02运行的实例其<code>execution_date</code>为2023-01-01(表示处理的是前一日的数据)。 数学表示为: <math> \text{实际运行时间} = \text{execution\_date} + \text{schedule\_interval} </math> == 调度器工作原理 == Airflow调度器通过以下步骤触发任务: 1. 解析DAG文件并注册到元数据库。 2. 根据<code>start_date</code>和<code>schedule_interval</code>生成DAG Run。 3. 检查任务依赖(上游任务是否成功)。 4. 将可运行的任务实例发送到执行器(Executor)。 <mermaid> graph TD A[扫描DAG文件] --> B[创建DAG Run] B --> C{检查依赖} C -->|满足| D[触发任务] C -->|不满足| E[等待] </mermaid> == 实际案例 == === 数据管道每日调度 === 假设需要每天凌晨处理前一天的日志数据: <syntaxhighlight lang="python"> dag = DAG( dag_id='log_processing', schedule_interval=timedelta(days=1), start_date=datetime(2023, 1, 1) ) extract = PythonOperator( task_id='extract_logs', python_callable=extract_function, dag=dag ) load = PythonOperator( task_id='load_to_database', python_callable=load_function, dag=dag ) extract >> load </syntaxhighlight> === 避免调度陷阱 === * '''避免动态<code>start_date</code>''':如使用<code>datetime.now()</code>会导致意外行为。 * '''理解catchup参数''':若设为<code>True</code>,调度器会补跑历史未执行的DAG Run。 == 高级配置 == === 使用Timetables自定义调度 === Airflow 2.2+支持通过<code>Timetable</code>类实现非标准调度逻辑(如跳过节假日): <syntaxhighlight lang="python"> class BusinessDayTimetable(Timetable): def infer_manual_data_interval(self, run_after): return DataInterval(start=run_after - timedelta(days=1), end=run_after) dag = DAG( schedule=BusinessDayTimetable(), ... ) </syntaxhighlight> == 常见问题 == * '''Q: 为什么任务没有按预期时间触发?''' * 检查<code>start_date</code>是否早于当前时间,且<code>schedule_interval</code>已正确设置。 * '''Q: <code>execution_date</code>为何是过去时间?''' * 这是Airflow的设计特性,表示任务处理的数据所属时间窗口。 == 总结 == Airflow的调度系统通过灵活的配置满足不同场景需求,但需深入理解其时间计算逻辑以避免常见错误。建议通过[[Airflow官方文档]]进一步探索高级调度功能。 [[Category:大数据框架]] [[Category:Airflow]] [[Category:Airflow调度与触发]]
摘要:
请注意,所有对代码酷的贡献均被视为依照知识共享署名-非商业性使用-相同方式共享发表(详情请见
代码酷:著作权
)。如果您不希望您的文字作品被随意编辑和分发传播,请不要在此提交。
您同时也向我们承诺,您提交的内容为您自己所创作,或是复制自公共领域或类似自由来源。
未经许可,请勿提交受著作权保护的作品!
取消
编辑帮助
(在新窗口中打开)