跳转到内容

Airflow Sensor超时设置

来自代码酷

Airflow Sensor超时设置[编辑 | 编辑源代码]

介绍[编辑 | 编辑源代码]

Airflow Sensor 是 Apache Airflow 中用于监控外部系统状态的特殊算子,它会持续检查某个条件是否满足,直到条件为真或达到超时限制。超时设置是 Sensor 的核心配置之一,决定了 Sensor 等待条件满足的最长时间。若超时后条件仍未满足,Sensor 会标记为失败,触发相应的错误处理逻辑。

关键概念:

  • Timeout:Sensor 允许运行的最大时间(秒或 timedelta 对象)。
  • Soft Fail:超时后是否立即失败(默认)或继续重试(需结合 `mode="reschedule"`)。
  • Poke Interval:两次检查条件之间的间隔时间。

基本语法[编辑 | 编辑源代码]

通过 `timeout` 参数设置超时,示例:

from airflow.sensors.filesystem import FileSensor
from datetime import timedelta

file_sensor = FileSensor(
    task_id="wait_for_file",
    filepath="/data/example.txt",
    timeout=300,  # 300秒(5分钟)后超时
    poke_interval=30,  # 每30秒检查一次
    mode="poke"  # 默认模式(持续占用工作线程)
)

超时模式详解[编辑 | 编辑源代码]

1. 立即失败(默认)[编辑 | 编辑源代码]

当 `mode="poke"` 时,超时后任务直接失败:

FileSensor(
    task_id="sensor_fail",
    timeout=60,
    mode="poke"  # 超时后抛出 AirflowSensorTimeout
)

2. 重新调度(Reschedule)[编辑 | 编辑源代码]

释放工作线程,超时后重新排队:

FileSensor(
    task_id="sensor_reschedule",
    timeout=timedelta(minutes=10),
    mode="reschedule",  # 超时后释放资源
    poke_interval=120
)

数学原理[编辑 | 编辑源代码]

总检查次数计算公式: 解析失败 (语法错误): {\displaystyle n = \left\lfloor \frac{\text{timeout}}{\text{poke\_interval}} \right\rfloor }

实际案例[编辑 | 编辑源代码]

场景:监控数据库备份完成[编辑 | 编辑源代码]

等待数据库备份文件生成,超时1小时后通知运维:

from airflow.sensors.filesystem import FileSensor
from airflow.operators.email import EmailOperator

backup_sensor = FileSensor(
    task_id="check_backup",
    filepath="/backups/db_backup_{{ ds }}.sql",
    timeout=3600,
    poke_interval=300,
    on_failure_callback=lambda context: EmailOperator(
        task_id="alert_team",
        to="admin@example.com",
        subject="Backup Failed!",
        html_content="Database backup not completed within 1 hour."
    ).execute(context)
)

高级配置[编辑 | 编辑源代码]

动态超时计算[编辑 | 编辑源代码]

使用 Python Callable 动态设置超时:

def dynamic_timeout(**context):
    if context['execution_date'].weekday() == 6:  # 周日
        return 7200  # 2小时
    return 1800  # 其他时间30分钟

DynamicTimeoutSensor(
    task_id="dynamic_sensor",
    timeout=dynamic_timeout,
    ...
)

常见问题[编辑 | 编辑源代码]

问题 解决方案
超时不生效 检查 `execution_timeout` 是否冲突
资源占用高 使用 `mode="reschedule"` + 增大 `poke_interval`
时区问题 确保 `timeout` 和 `execution_timeout` 使用相同时区

可视化流程[编辑 | 编辑源代码]

graph TD A[Sensor 启动] --> B{条件满足?} B -- 是 --> C[任务成功] B -- 否 --> D{超时?} D -- 是 --> E[任务失败] D -- 否 --> F[等待 poke_interval] F --> B

最佳实践[编辑 | 编辑源代码]

1. 生产环境建议设置 `timeout` < `execution_timeout`(DAG 级超时) 2. 长时间等待(>1小时)优先使用 `mode="reschedule"` 3. 关键路径任务需配置 `on_failure_callback`