跳转到内容
主菜单
主菜单
移至侧栏
隐藏
导航
首页
最近更改
随机页面
MediaWiki帮助
代码酷
搜索
搜索
中文(中国大陆)
外观
创建账号
登录
个人工具
创建账号
登录
未登录编辑者的页面
了解详情
贡献
讨论
编辑“︁
Airflow调度器
”︁(章节)
页面
讨论
大陆简体
阅读
编辑
编辑源代码
查看历史
工具
工具
移至侧栏
隐藏
操作
阅读
编辑
编辑源代码
查看历史
常规
链入页面
相关更改
特殊页面
页面信息
外观
移至侧栏
隐藏
您的更改会在有权核准的用户核准后向读者展示。
警告:
您没有登录。如果您进行任何编辑,您的IP地址会公开展示。如果您
登录
或
创建账号
,您的编辑会以您的用户名署名,此外还有其他益处。
反垃圾检查。
不要
加入这个!
= Airflow调度器 = '''Airflow调度器'''(Scheduler)是Apache Airflow的核心组件之一,负责解析[[DAG]](有向无环图)、触发任务执行并监控任务状态。它确保任务按照预定的时间间隔和依赖关系正确执行,是自动化工作流的关键部分。 == 概述 == 调度器的主要职责包括: * 解析DAG文件,构建任务依赖关系图。 * 根据调度间隔(schedule_interval)触发DAG运行。 * 将任务实例(Task Instance)发送给执行器(Executor)运行。 * 监控任务状态,处理重试、超时等逻辑。 调度器采用多进程架构,通过持续扫描`DAGs`目录和元数据库(如PostgreSQL/MySQL)来协调任务执行。其设计目标是高可靠性和可扩展性。 == 工作原理 == === 核心循环 === 调度器的主循环分为以下阶段: 1. '''DAG解析''':解析Python文件生成DAG对象。 2. '''调度决策''':检查DAG的`schedule_interval`,创建DAG Run。 3. '''任务排队''':将就绪的Task Instance加入执行队列。 4. '''状态同步''':与执行器通信更新任务状态。 <mermaid> graph TD A[扫描DAG目录] --> B[解析DAG文件] B --> C{是否到达调度时间?} C -->|是| D[创建DAG Run] C -->|否| A D --> E[生成Task Instances] E --> F[检查依赖关系] F --> G[将就绪任务加入队列] G --> H[执行器获取任务] </mermaid> === 调度时间计算 === Airflow使用'''时间点'''(execution_date)标记DAG运行的实际逻辑时间。对于每天运行的DAG: * 2023-01-02 00:00:00启动的运行,其`execution_date`为2023-01-01 00:00:00 * 计算公式:<math>execution\_date = next\_execution - schedule\_interval</math> == 配置与优化 == === 关键参数 === 在`airflow.cfg`中调整调度器性能: <syntaxhighlight lang="ini"> [scheduler] # 解析DAG的频率(秒) dag_dir_list_interval = 300 # 最大DAG解析进程数 max_dagruns_to_create_per_loop = 10 # 每个循环最大处理的任务数 max_tis_per_query = 512 </syntaxhighlight> === 高可用模式 === 通过以下方式实现调度器高可用: * 使用数据库行锁(如PostgreSQL advisory lock) * 部署多个调度器实例,但仅一个处于活跃状态 * 结合健康检查实现故障转移 == 代码示例 == === 定义调度间隔 === <syntaxhighlight lang="python"> from datetime import datetime, timedelta from airflow import DAG dag = DAG( 'my_daily_dag', schedule_interval=timedelta(days=1), # 每天运行 start_date=datetime(2023, 1, 1), ) </syntaxhighlight> === 手动触发说明 === 通过CLI手动触发DAG运行: <syntaxhighlight lang="bash"> # 执行特定DAG(使用当前时间作为逻辑日期) airflow dags trigger my_daily_dag # 指定execution_date airflow dags trigger --exec-date "2023-01-01T00:00:00" my_daily_dag </syntaxhighlight> == 实际案例 == '''电商数据管道场景''': 1. 每日00:30运行订单分析DAG 2. 依赖关系: * 先运行数据库导出任务 * 成功后再执行Spark分析任务 * 最后发送邮件报告 <mermaid> gantt title 电商数据管道执行时间线 dateFormat YYYY-MM-DD HH:mm section 任务依赖 数据库导出 :a1, 2023-01-02 00:30, 30m Spark分析 :a2, after a1, 2h 发送报告 :a3, after a2, 15m </mermaid> == 常见问题 == === 调度延迟 === 可能原因: * 系统资源不足(CPU/内存) * DAG文件过于复杂导致解析缓慢 * 数据库性能瓶颈 解决方案: * 增加`[scheduler]`配置中的`parsing_processes` * 优化DAG文件结构 * 使用更高效的元数据库 === 任务堆积 === 现象:大量任务处于"queued"状态 处理方法: * 调整`max_threads`增加并行度 * 使用更强大的执行器(如CeleryExecutor) * 实现优先级权重(`priority_weight`参数) == 高级主题 == === 自定义调度器 === 通过继承`airflow.scheduler.Scheduler`类实现: <syntaxhighlight lang="python"> from airflow.scheduler import Scheduler class CustomScheduler(Scheduler): def process_dag(self, dag): # 添加自定义逻辑 super().process_dag(dag) </syntaxhighlight> === 时区处理 === Airflow 2.0+默认使用UTC时间,建议: * 在`airflow.cfg`中设置`default_timezone` * 在DAG中明确指定时区: <syntaxhighlight lang="python"> dag = DAG( 'timezone_aware', start_date=datetime(2023, 1, 1, tzinfo=timezone('Asia/Shanghai')), ) </syntaxhighlight> == 最佳实践 == 1. 保持DAG文件简洁,避免复杂逻辑 2. 监控调度器日志(`airflow scheduler`输出) 3. 对频繁运行的DAG使用`@once`或`None`作为`schedule_interval` 4. 定期清理旧的DAG Run记录 == 总结 == Airflow调度器是工作流自动化的"大脑",理解其工作原理对于构建可靠的数据管道至关重要。通过合理配置和持续监控,可以确保任务按时执行并高效利用系统资源。 [[Category:大数据框架]] [[Category:Airflow]] [[Category:Airflow基础]]
摘要:
请注意,所有对代码酷的贡献均被视为依照知识共享署名-非商业性使用-相同方式共享发表(详情请见
代码酷:著作权
)。如果您不希望您的文字作品被随意编辑和分发传播,请不要在此提交。
您同时也向我们承诺,您提交的内容为您自己所创作,或是复制自公共领域或类似自由来源。
未经许可,请勿提交受著作权保护的作品!
取消
编辑帮助
(在新窗口中打开)