Airflow ExternalTaskSensor
Airflow ExternalTaskSensor[编辑 | 编辑源代码]
ExternalTaskSensor 是 Apache Airflow 中一种特殊的传感器(Sensor),用于监控外部 DAG(有向无环图)或任务的状态,并在满足条件时触发当前 DAG 的后续任务。它常用于跨 DAG 依赖的场景,确保工作流按正确的顺序执行。
基本概念[编辑 | 编辑源代码]
ExternalTaskSensor 通过轮询(polling)机制检查外部 DAG 或任务是否完成。它可以监控:
- 另一个 DAG 的特定任务实例(Task Instance)
- 另一个 DAG 的整个运行实例(DAG Run)
传感器会持续检查元数据库(如 PostgreSQL、MySQL)中的任务状态,直到外部任务成功完成或达到超时限制。
核心参数[编辑 | 编辑源代码]
以下是关键参数说明:
- external_dag_id:要监控的外部 DAG 的 ID(必需)
- external_task_id:要监控的外部任务的 ID(可选,不指定则监控整个 DAG)
- execution_date:外部任务的执行日期(可选,默认使用相同的逻辑日期)
- allowed_states:允许的状态列表(如
['success']
) - failed_states:失败的状态列表(如
['failed', 'skipped']
) - mode:传感器模式(
poke
或reschedule
) - timeout:超时时间(秒)
代码示例[编辑 | 编辑源代码]
基本用法[编辑 | 编辑源代码]
以下示例展示如何监控另一个 DAG (external_dag
) 中的特定任务 (external_task
):
from airflow import DAG
from airflow.sensors.external_task import ExternalTaskSensor
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 1, 1),
}
with DAG('consumer_dag',
default_args=default_args,
schedule_interval='@daily') as dag:
wait_for_external_task = ExternalTaskSensor(
task_id='wait_for_external_task',
external_dag_id='external_dag',
external_task_id='external_task',
execution_date="{{ execution_date }}",
allowed_states=['success'],
timeout=3600,
mode='poke',
poke_interval=60
)
监控整个 DAG[编辑 | 编辑源代码]
如果不指定 external_task_id
,则监控整个 DAG 的运行状态:
wait_for_external_dag = ExternalTaskSensor(
task_id='wait_for_external_dag',
external_dag_id='external_dag',
execution_date="{{ execution_date }}",
timeout=3600
)
执行日期处理[编辑 | 编辑源代码]
处理跨 DAG 依赖时,执行日期(execution_date
)的匹配至关重要。常见模式包括:
相同执行日期[编辑 | 编辑源代码]
execution_date="{{ execution_date }}"
偏移执行日期[编辑 | 编辑源代码]
监控前一天的任务:
execution_date="{{ prev_execution_date }}"
或使用时间增量:
execution_date="{{ execution_date - macros.timedelta(days=1) }}"
实际案例[编辑 | 编辑源代码]
数据管道依赖[编辑 | 编辑源代码]
假设有两个 DAG:
data_processing_dag
:处理原始数据report_generation_dag
:生成报告
报告生成必须在数据处理完成后执行:
实现代码:
# 在 report_generation_dag 中
wait_for_processing = ExternalTaskSensor(
task_id='wait_for_processing',
external_dag_id='data_processing_dag',
external_task_id='process_data',
execution_date="{{ execution_date }}",
timeout=3600
)
generate_report = BashOperator(
task_id='generate_report',
bash_command='generate_report.sh'
)
wait_for_processing >> generate_report
高级配置[编辑 | 编辑源代码]
跨时区调度[编辑 | 编辑源代码]
当 DAG 在不同时区运行时,需要调整 execution_date
:
execution_date="{{ execution_date.astimezone(pytz.timezone('Asia/Tokyo')) }}"
动态任务监控[编辑 | 编辑源代码]
使用 XCom 传递要监控的任务 ID:
external_task_id="{{ ti.xcom_pull(task_ids='get_task_id') }}"
常见问题[编辑 | 编辑源代码]
无限等待[编辑 | 编辑源代码]
可能原因:
- 外部任务未运行
execution_date
不匹配- 外部任务状态未更新
解决方案:
- 检查 Airflow 元数据库中的
task_instance
表 - 验证
execution_date
格式
性能优化[编辑 | 编辑源代码]
对于高频检查:
- 增加
poke_interval
(如 300 秒) - 使用
mode='reschedule'
释放工作器插槽
数学表达[编辑 | 编辑源代码]
传感器检查逻辑可以表示为: 解析失败 (未知函数“\begin{cases}”): {\displaystyle P(t) = \begin{cases} \text{True} & \text{if } S_{ext} \in \text{allowed\_states} \\ \text{False} & \text{otherwise} \end{cases} } 其中:
- 是时间 t 的检查结果
- 是外部任务状态
总结[编辑 | 编辑源代码]
ExternalTaskSensor 是实现跨 DAG 依赖的强大工具。关键要点:
- 精确匹配
execution_date
至关重要 - 监控单个任务或整个 DAG
- 合理设置超时和轮询间隔
- 适用于数据管道、报告系统等依赖场景
通过正确配置,可以构建复杂的跨 DAG 工作流,同时保持系统的可靠性和可维护性。