Airflow TimeSensor
外观
Airflow TimeSensor[编辑 | 编辑源代码]
介绍[编辑 | 编辑源代码]
Airflow TimeSensor 是 Apache Airflow 中的一种特殊传感器(Sensor),用于等待特定时间点到达后再继续执行后续任务。与基于时间间隔的等待(如 `TimeDeltaSensor`)不同,`TimeSensor` 直接监测一个绝对时间戳,常用于调度需要精确时间触发的任务。
核心概念[编辑 | 编辑源代码]
- 绝对时间触发:等待直到系统时间达到指定的 `target_time`。
- 非阻塞检查:通过 `poke_interval` 参数控制检查频率,避免资源浪费。
- 时区处理:支持带时区的 `datetime` 对象,确保跨时区调度准确性。
基本用法[编辑 | 编辑源代码]
以下示例展示如何在 DAG 中使用 `TimeSensor`:
from airflow import DAG
from airflow.sensors.time_sensor import TimeSensor
from datetime import datetime, time
with DAG('time_sensor_example', start_date=datetime(2023, 1, 1)) as dag:
# 等待直到当天14:30:00
wait_for_time = TimeSensor(
task_id='wait_for_1430',
target_time=time(14, 30),
)
参数详解[编辑 | 编辑源代码]
参数 | 类型 | 说明 |
---|---|---|
`datetime.time` | 必须参数,指定目标时间(时:分:秒) | ||
`int` | 可选,检查间隔秒数(默认60) | ||
`int` | 可选,超时秒数(默认7天) |
高级特性[编辑 | 编辑源代码]
时区敏感示例[编辑 | 编辑源代码]
处理跨时区场景时需显式指定时区:
from pendulum import timezone
tz = timezone("Europe/Paris")
target_time = time(12, 0, tzinfo=tz)
动态时间计算[编辑 | 编辑源代码]
结合 `PythonOperator` 动态生成目标时间:
def _calculate_target_time(**context):
execution_date = context['execution_date']
return execution_date.add(hours=3).time()
TimeSensor(
task_id='dynamic_time',
target_time=_calculate_target_time,
)
实际案例[编辑 | 编辑源代码]
场景:跨系统时间同步[编辑 | 编辑源代码]
某金融系统需在交易所开盘(09:30 EST)后启动数据处理流程:
代码实现[编辑 | 编辑源代码]
from pytz import timezone
est = timezone('US/Eastern')
open_time = time(9, 30, tzinfo=est)
TimeSensor(
task_id='wait_market_open',
target_time=open_time,
timeout=3600 # 超时1小时
)
常见问题[编辑 | 编辑源代码]
Q1: 与TimeDeltaSensor的区别?[编辑 | 编辑源代码]
- `TimeSensor` 监测绝对时间(如"每天14:00")
- `TimeDeltaSensor` 监测相对时间(如"等待2小时后")
Q2: 如何避免无限等待?[编辑 | 编辑源代码]
通过设置 `timeout` 参数:
TimeSensor(
task_id='safe_wait',
target_time=time(23, 59),
timeout=300 # 5分钟后超时失败
)
数学原理[编辑 | 编辑源代码]
传感器通过循环检查满足时间条件: 其中 为 `poke_interval`。
性能优化[编辑 | 编辑源代码]
- 对于高频检查(如每分钟),建议减小 `poke_interval`
- 长期等待任务应考虑使用 `TriggerDagRunOperator` 替代