跳转到内容
主菜单
主菜单
移至侧栏
隐藏
导航
首页
最近更改
随机页面
MediaWiki帮助
代码酷
搜索
搜索
中文(中国大陆)
外观
创建账号
登录
个人工具
创建账号
登录
未登录编辑者的页面
了解详情
贡献
讨论
编辑“︁
Airflow ExternalTaskSensor
”︁(章节)
页面
讨论
大陆简体
阅读
编辑
编辑源代码
查看历史
工具
工具
移至侧栏
隐藏
操作
阅读
编辑
编辑源代码
查看历史
常规
链入页面
相关更改
特殊页面
页面信息
外观
移至侧栏
隐藏
您的更改会在有权核准的用户核准后向读者展示。
警告:
您没有登录。如果您进行任何编辑,您的IP地址会公开展示。如果您
登录
或
创建账号
,您的编辑会以您的用户名署名,此外还有其他益处。
反垃圾检查。
不要
加入这个!
= Airflow ExternalTaskSensor = '''ExternalTaskSensor''' 是 Apache Airflow 中一种特殊的传感器(Sensor),用于监控外部 DAG(有向无环图)或任务的状态,并在满足条件时触发当前 DAG 的后续任务。它常用于跨 DAG 依赖的场景,确保工作流按正确的顺序执行。 == 基本概念 == ExternalTaskSensor 通过轮询(polling)机制检查外部 DAG 或任务是否完成。它可以监控: * 另一个 DAG 的特定任务实例(Task Instance) * 另一个 DAG 的整个运行实例(DAG Run) 传感器会持续检查元数据库(如 PostgreSQL、MySQL)中的任务状态,直到外部任务成功完成或达到超时限制。 === 核心参数 === 以下是关键参数说明: * '''external_dag_id''':要监控的外部 DAG 的 ID(必需) * '''external_task_id''':要监控的外部任务的 ID(可选,不指定则监控整个 DAG) * '''execution_date''':外部任务的执行日期(可选,默认使用相同的逻辑日期) * '''allowed_states''':允许的状态列表(如 <code>['success']</code>) * '''failed_states''':失败的状态列表(如 <code>['failed', 'skipped']</code>) * '''mode''':传感器模式(<code>poke</code> 或 <code>reschedule</code>) * '''timeout''':超时时间(秒) == 代码示例 == === 基本用法 === 以下示例展示如何监控另一个 DAG (<code>external_dag</code>) 中的特定任务 (<code>external_task</code>): <syntaxhighlight lang="python"> from airflow import DAG from airflow.sensors.external_task import ExternalTaskSensor from datetime import datetime, timedelta default_args = { 'owner': 'airflow', 'start_date': datetime(2023, 1, 1), } with DAG('consumer_dag', default_args=default_args, schedule_interval='@daily') as dag: wait_for_external_task = ExternalTaskSensor( task_id='wait_for_external_task', external_dag_id='external_dag', external_task_id='external_task', execution_date="{{ execution_date }}", allowed_states=['success'], timeout=3600, mode='poke', poke_interval=60 ) </syntaxhighlight> === 监控整个 DAG === 如果不指定 <code>external_task_id</code>,则监控整个 DAG 的运行状态: <syntaxhighlight lang="python"> wait_for_external_dag = ExternalTaskSensor( task_id='wait_for_external_dag', external_dag_id='external_dag', execution_date="{{ execution_date }}", timeout=3600 ) </syntaxhighlight> == 执行日期处理 == 处理跨 DAG 依赖时,执行日期(<code>execution_date</code>)的匹配至关重要。常见模式包括: === 相同执行日期 === <syntaxhighlight lang="python"> execution_date="{{ execution_date }}" </syntaxhighlight> === 偏移执行日期 === 监控前一天的任务: <syntaxhighlight lang="python"> execution_date="{{ prev_execution_date }}" </syntaxhighlight> 或使用时间增量: <syntaxhighlight lang="python"> execution_date="{{ execution_date - macros.timedelta(days=1) }}" </syntaxhighlight> == 实际案例 == === 数据管道依赖 === 假设有两个 DAG: * <code>data_processing_dag</code>:处理原始数据 * <code>report_generation_dag</code>:生成报告 报告生成必须在数据处理完成后执行: <mermaid> graph LR A[data_processing_dag: process_data] --> B[report_generation_dag: wait_for_processing] B --> C[report_generation_dag: generate_report] </mermaid> 实现代码: <syntaxhighlight lang="python"> # 在 report_generation_dag 中 wait_for_processing = ExternalTaskSensor( task_id='wait_for_processing', external_dag_id='data_processing_dag', external_task_id='process_data', execution_date="{{ execution_date }}", timeout=3600 ) generate_report = BashOperator( task_id='generate_report', bash_command='generate_report.sh' ) wait_for_processing >> generate_report </syntaxhighlight> == 高级配置 == === 跨时区调度 === 当 DAG 在不同时区运行时,需要调整 <code>execution_date</code>: <syntaxhighlight lang="python"> execution_date="{{ execution_date.astimezone(pytz.timezone('Asia/Tokyo')) }}" </syntaxhighlight> === 动态任务监控 === 使用 XCom 传递要监控的任务 ID: <syntaxhighlight lang="python"> external_task_id="{{ ti.xcom_pull(task_ids='get_task_id') }}" </syntaxhighlight> == 常见问题 == === 无限等待 === 可能原因: * 外部任务未运行 * <code>execution_date</code> 不匹配 * 外部任务状态未更新 解决方案: * 检查 Airflow 元数据库中的 <code>task_instance</code> 表 * 验证 <code>execution_date</code> 格式 === 性能优化 === 对于高频检查: * 增加 <code>poke_interval</code>(如 300 秒) * 使用 <code>mode='reschedule'</code> 释放工作器插槽 == 数学表达 == 传感器检查逻辑可以表示为: <math> P(t) = \begin{cases} \text{True} & \text{if } S_{ext} \in \text{allowed\_states} \\ \text{False} & \text{otherwise} \end{cases} </math> 其中: * <math>P(t)</math> 是时间 t 的检查结果 * <math>S_{ext}</math> 是外部任务状态 == 总结 == ExternalTaskSensor 是实现跨 DAG 依赖的强大工具。关键要点: * 精确匹配 <code>execution_date</code> 至关重要 * 监控单个任务或整个 DAG * 合理设置超时和轮询间隔 * 适用于数据管道、报告系统等依赖场景 通过正确配置,可以构建复杂的跨 DAG 工作流,同时保持系统的可靠性和可维护性。 [[Category:大数据框架]] [[Category:Airflow]] [[Category:Airflow Sensors应用]]
摘要:
请注意,所有对代码酷的贡献均被视为依照知识共享署名-非商业性使用-相同方式共享发表(详情请见
代码酷:著作权
)。如果您不希望您的文字作品被随意编辑和分发传播,请不要在此提交。
您同时也向我们承诺,您提交的内容为您自己所创作,或是复制自公共领域或类似自由来源。
未经许可,请勿提交受著作权保护的作品!
取消
编辑帮助
(在新窗口中打开)