跳转到内容
主菜单
主菜单
移至侧栏
隐藏
导航
首页
最近更改
随机页面
MediaWiki帮助
代码酷
搜索
搜索
中文(中国大陆)
外观
创建账号
登录
个人工具
创建账号
登录
未登录编辑者的页面
了解详情
贡献
讨论
编辑“︁
Airflow回填机制
”︁
页面
讨论
大陆简体
阅读
编辑
编辑源代码
查看历史
工具
工具
移至侧栏
隐藏
操作
阅读
编辑
编辑源代码
查看历史
常规
链入页面
相关更改
特殊页面
页面信息
外观
移至侧栏
隐藏
您的更改会在有权核准的用户核准后向读者展示。
警告:
您没有登录。如果您进行任何编辑,您的IP地址会公开展示。如果您
登录
或
创建账号
,您的编辑会以您的用户名署名,此外还有其他益处。
反垃圾检查。
不要
加入这个!
= Airflow回填机制 = == 介绍 == '''Airflow回填机制'''(Backfilling)是Apache Airflow中一项核心功能,允许用户针对历史时间段重新运行或补录任务。该机制常用于以下场景: * 修复因代码错误或系统故障导致的历史任务失败 * 首次部署DAG时处理过去的数据 * 测试新逻辑在历史数据上的表现 回填通过调度器为指定时间范围生成任务实例(Task Instances),并按依赖关系依次执行。与常规调度不同,回填会忽略当前时间,专注于处理过去的时间窗口。 == 核心原理 == 回填的关键参数包括: * <code>start_date</code>:回填的起始日期 * <code>end_date</code>:回填的结束日期 * <code>mark_success</code>:是否将任务标记为成功(不实际运行) * <code>rerun_failed_tasks</code>:仅重试失败任务 Airflow通过以下步骤实现回填: 1. 解析DAG文件,识别任务依赖 2. 为指定时间范围内的每个调度间隔创建任务实例 3. 按照DAG依赖关系依次执行实例 === 时间范围计算 === 回填的时间窗口由<code>schedule_interval</code>决定。例如,每日调度的DAG(<code>schedule_interval="@daily"</code>)在回填2023-01-01至2023-01-03时,会生成3个任务实例: * 2023-01-01 00:00:00 * 2023-01-02 00:00:00 * 2023-01-03 00:00:00 == 操作方式 == === CLI命令 === 通过<code>airflow dags backfill</code>命令执行回填: <syntaxhighlight lang="bash"> # 基本回填示例 airflow dags backfill -s 2023-01-01 -e 2023-01-03 my_dag_id # 仅标记成功(不实际运行) airflow dags backfill -s 2023-01-01 -e 2023-01-03 --mark_success my_dag_id # 重试失败任务 airflow dags backfill -s 2023-01-01 -e 2023-01-03 --rerun_failed_tasks my_dag_id </syntaxhighlight> === Python API === 可通过<code>BackfillJob</code>编程实现: <syntaxhighlight lang="python"> from airflow.models import DagBag from airflow.jobs.backfill_job import BackfillJob dag_id = "my_dag_id" dag_bag = DagBag() dag = dag_bag.get_dag(dag_id) backfill_job = BackfillJob( dag=dag, start_date="2023-01-01", end_date="2023-01-03", ignore_first_depends_on_past=True ) backfill_job.run() </syntaxhighlight> == 实际案例 == === 场景:补录天气数据 === 某气象分析DAG每天下载并处理天气数据。因API故障导致2023-06-01至2023-06-05数据缺失,修复后需回填: <mermaid> graph LR A[下载数据] --> B[清洗数据] B --> C[生成报告] </mermaid> 执行命令: <syntaxhighlight lang="bash"> airflow dags backfill -s 2023-06-01 -e 2023-06-05 weather_pipeline </syntaxhighlight> 系统会按顺序为每天独立执行完整流程,各天的任务实例互不影响。 == 注意事项 == * '''依赖传递''':若设置<code>depends_on_past=True</code>,需添加<code>--ignore_first_depends_on_past</code>参数 * '''资源消耗''':大规模回填可能占用大量计算资源 * '''数据幂等性''':确保任务能安全重复执行 * '''执行时间''':回填速度受<code>max_active_runs</code>参数限制 == 高级配置 == === 并行控制 === 通过<code>--pool</code>和<code>--max_active_runs</code>限制并发: <syntaxhighlight lang="bash"> airflow dags backfill -s 2023-01-01 -e 2023-01-31 --max_active_runs 5 my_dag_id </syntaxhighlight> === 动态时间范围 === 结合Python生成动态日期: <syntaxhighlight lang="python"> from datetime import datetime, timedelta end_date = datetime.now() start_date = end_date - timedelta(days=30) </syntaxhighlight> == 数学原理 == 对于包含<math>n</math>个调度间隔的回填,总任务实例数计算为: <math> \text{实例数} = \frac{\text{end\_date} - \text{start\_date}}{\text{schedule\_interval}} + 1 </math> == 常见问题 == '''Q:回填会触发后续任务吗?''' A:不会。回填仅处理指定时间范围内的任务实例,不影响后续调度。 '''Q:如何停止正在进行的回填?''' A:删除回填作业记录: <syntaxhighlight lang="sql"> DELETE FROM airflow.jobs WHERE job_type = 'BackfillJob' AND dag_id = 'my_dag_id'; </syntaxhighlight> == 总结 == Airflow回填机制是处理历史数据的重要工具,通过合理控制时间范围和参数,可以高效完成数据补录、故障恢复等操作。建议在生产环境中使用前,先在测试环境验证任务幂等性和资源消耗。 [[Category:大数据框架]] [[Category:Airflow]] [[Category:Airflow调度与触发]]
摘要:
请注意,所有对代码酷的贡献均被视为依照知识共享署名-非商业性使用-相同方式共享发表(详情请见
代码酷:著作权
)。如果您不希望您的文字作品被随意编辑和分发传播,请不要在此提交。
您同时也向我们承诺,您提交的内容为您自己所创作,或是复制自公共领域或类似自由来源。
未经许可,请勿提交受著作权保护的作品!
取消
编辑帮助
(在新窗口中打开)