Airflow Sensor超时设置
外观
Airflow Sensor超时设置[编辑 | 编辑源代码]
介绍[编辑 | 编辑源代码]
Airflow Sensor 是 Apache Airflow 中用于监控外部系统状态的特殊算子,它会持续检查某个条件是否满足,直到条件为真或达到超时限制。超时设置是 Sensor 的核心配置之一,决定了 Sensor 等待条件满足的最长时间。若超时后条件仍未满足,Sensor 会标记为失败,触发相应的错误处理逻辑。
关键概念:
- Timeout:Sensor 允许运行的最大时间(秒或 timedelta 对象)。
- Soft Fail:超时后是否立即失败(默认)或继续重试(需结合 `mode="reschedule"`)。
- Poke Interval:两次检查条件之间的间隔时间。
基本语法[编辑 | 编辑源代码]
通过 `timeout` 参数设置超时,示例:
from airflow.sensors.filesystem import FileSensor
from datetime import timedelta
file_sensor = FileSensor(
task_id="wait_for_file",
filepath="/data/example.txt",
timeout=300, # 300秒(5分钟)后超时
poke_interval=30, # 每30秒检查一次
mode="poke" # 默认模式(持续占用工作线程)
)
超时模式详解[编辑 | 编辑源代码]
1. 立即失败(默认)[编辑 | 编辑源代码]
当 `mode="poke"` 时,超时后任务直接失败:
FileSensor(
task_id="sensor_fail",
timeout=60,
mode="poke" # 超时后抛出 AirflowSensorTimeout
)
2. 重新调度(Reschedule)[编辑 | 编辑源代码]
释放工作线程,超时后重新排队:
FileSensor(
task_id="sensor_reschedule",
timeout=timedelta(minutes=10),
mode="reschedule", # 超时后释放资源
poke_interval=120
)
数学原理[编辑 | 编辑源代码]
总检查次数计算公式: 解析失败 (语法错误): {\displaystyle n = \left\lfloor \frac{\text{timeout}}{\text{poke\_interval}} \right\rfloor }
实际案例[编辑 | 编辑源代码]
场景:监控数据库备份完成[编辑 | 编辑源代码]
等待数据库备份文件生成,超时1小时后通知运维:
from airflow.sensors.filesystem import FileSensor
from airflow.operators.email import EmailOperator
backup_sensor = FileSensor(
task_id="check_backup",
filepath="/backups/db_backup_{{ ds }}.sql",
timeout=3600,
poke_interval=300,
on_failure_callback=lambda context: EmailOperator(
task_id="alert_team",
to="admin@example.com",
subject="Backup Failed!",
html_content="Database backup not completed within 1 hour."
).execute(context)
)
高级配置[编辑 | 编辑源代码]
动态超时计算[编辑 | 编辑源代码]
使用 Python Callable 动态设置超时:
def dynamic_timeout(**context):
if context['execution_date'].weekday() == 6: # 周日
return 7200 # 2小时
return 1800 # 其他时间30分钟
DynamicTimeoutSensor(
task_id="dynamic_sensor",
timeout=dynamic_timeout,
...
)
常见问题[编辑 | 编辑源代码]
问题 | 解决方案 |
---|---|
超时不生效 | 检查 `execution_timeout` 是否冲突 |
资源占用高 | 使用 `mode="reschedule"` + 增大 `poke_interval` |
时区问题 | 确保 `timeout` 和 `execution_timeout` 使用相同时区 |
可视化流程[编辑 | 编辑源代码]
最佳实践[编辑 | 编辑源代码]
1. 生产环境建议设置 `timeout` < `execution_timeout`(DAG 级超时) 2. 长时间等待(>1小时)优先使用 `mode="reschedule"` 3. 关键路径任务需配置 `on_failure_callback`