跳转到内容

Airflow TimeSensor

来自代码酷

Airflow TimeSensor[编辑 | 编辑源代码]

介绍[编辑 | 编辑源代码]

Airflow TimeSensor 是 Apache Airflow 中的一种特殊传感器(Sensor),用于等待特定时间点到达后再继续执行后续任务。与基于时间间隔的等待(如 `TimeDeltaSensor`)不同,`TimeSensor` 直接监测一个绝对时间戳,常用于调度需要精确时间触发的任务。

核心概念[编辑 | 编辑源代码]

  • 绝对时间触发:等待直到系统时间达到指定的 `target_time`。
  • 非阻塞检查:通过 `poke_interval` 参数控制检查频率,避免资源浪费。
  • 时区处理:支持带时区的 `datetime` 对象,确保跨时区调度准确性。

基本用法[编辑 | 编辑源代码]

以下示例展示如何在 DAG 中使用 `TimeSensor`:

from airflow import DAG
from airflow.sensors.time_sensor import TimeSensor
from datetime import datetime, time

with DAG('time_sensor_example', start_date=datetime(2023, 1, 1)) as dag:
    # 等待直到当天14:30:00
    wait_for_time = TimeSensor(
        task_id='wait_for_1430',
        target_time=time(14, 30),
    )

参数详解[编辑 | 编辑源代码]

参数 类型 说明
`datetime.time` | 必须参数,指定目标时间(时:分:秒)
`int` | 可选,检查间隔秒数(默认60)
`int` | 可选,超时秒数(默认7天)

高级特性[编辑 | 编辑源代码]

时区敏感示例[编辑 | 编辑源代码]

处理跨时区场景时需显式指定时区:

from pendulum import timezone

tz = timezone("Europe/Paris")
target_time = time(12, 0, tzinfo=tz)

动态时间计算[编辑 | 编辑源代码]

结合 `PythonOperator` 动态生成目标时间:

def _calculate_target_time(**context):
    execution_date = context['execution_date']
    return execution_date.add(hours=3).time()

TimeSensor(
    task_id='dynamic_time',
    target_time=_calculate_target_time,
)

实际案例[编辑 | 编辑源代码]

场景:跨系统时间同步[编辑 | 编辑源代码]

某金融系统需在交易所开盘(09:30 EST)后启动数据处理流程:

graph LR A[TimeSensor: 09:30 EST] --> B[下载市场数据] B --> C[数据清洗]

代码实现[编辑 | 编辑源代码]

from pytz import timezone

est = timezone('US/Eastern')
open_time = time(9, 30, tzinfo=est)

TimeSensor(
    task_id='wait_market_open',
    target_time=open_time,
    timeout=3600  # 超时1小时
)

常见问题[编辑 | 编辑源代码]

Q1: 与TimeDeltaSensor的区别?[编辑 | 编辑源代码]

  • `TimeSensor` 监测绝对时间(如"每天14:00")
  • `TimeDeltaSensor` 监测相对时间(如"等待2小时后")

Q2: 如何避免无限等待?[编辑 | 编辑源代码]

通过设置 `timeout` 参数:

TimeSensor(
    task_id='safe_wait',
    target_time=time(23, 59),
    timeout=300  # 5分钟后超时失败
)

数学原理[编辑 | 编辑源代码]

传感器通过循环检查满足时间条件: {continueif TnowTtargetwait Δtotherwise 其中 Δt 为 `poke_interval`。

性能优化[编辑 | 编辑源代码]

  • 对于高频检查(如每分钟),建议减小 `poke_interval`
  • 长期等待任务应考虑使用 `TriggerDagRunOperator` 替代