跳转到内容
主菜单
主菜单
移至侧栏
隐藏
导航
首页
最近更改
随机页面
MediaWiki帮助
代码酷
搜索
搜索
中文(中国大陆)
外观
创建账号
登录
个人工具
创建账号
登录
未登录编辑者的页面
了解详情
贡献
讨论
编辑“︁
Airflow TimeSensor
”︁
页面
讨论
大陆简体
阅读
编辑
编辑源代码
查看历史
工具
工具
移至侧栏
隐藏
操作
阅读
编辑
编辑源代码
查看历史
常规
链入页面
相关更改
特殊页面
页面信息
外观
移至侧栏
隐藏
您的更改会在有权核准的用户核准后向读者展示。
警告:
您没有登录。如果您进行任何编辑,您的IP地址会公开展示。如果您
登录
或
创建账号
,您的编辑会以您的用户名署名,此外还有其他益处。
反垃圾检查。
不要
加入这个!
= Airflow TimeSensor = == 介绍 == '''Airflow TimeSensor''' 是 Apache Airflow 中的一种特殊传感器(Sensor),用于等待特定时间点到达后再继续执行后续任务。与基于时间间隔的等待(如 `TimeDeltaSensor`)不同,`TimeSensor` 直接监测一个绝对时间戳,常用于调度需要精确时间触发的任务。 == 核心概念 == * '''绝对时间触发''':等待直到系统时间达到指定的 `target_time`。 * '''非阻塞检查''':通过 `poke_interval` 参数控制检查频率,避免资源浪费。 * '''时区处理''':支持带时区的 `datetime` 对象,确保跨时区调度准确性。 == 基本用法 == 以下示例展示如何在 DAG 中使用 `TimeSensor`: <syntaxhighlight lang="python"> from airflow import DAG from airflow.sensors.time_sensor import TimeSensor from datetime import datetime, time with DAG('time_sensor_example', start_date=datetime(2023, 1, 1)) as dag: # 等待直到当天14:30:00 wait_for_time = TimeSensor( task_id='wait_for_1430', target_time=time(14, 30), ) </syntaxhighlight> === 参数详解 === {| class="wikitable" |- ! 参数 !! 类型 !! 说明 |- | `target_time` | `datetime.time` | 必须参数,指定目标时间(时:分:秒) |- | `poke_interval` | `int` | 可选,检查间隔秒数(默认60) |- | `timeout` | `int` | 可选,超时秒数(默认7天) |} == 高级特性 == === 时区敏感示例 === 处理跨时区场景时需显式指定时区: <syntaxhighlight lang="python"> from pendulum import timezone tz = timezone("Europe/Paris") target_time = time(12, 0, tzinfo=tz) </syntaxhighlight> === 动态时间计算 === 结合 `PythonOperator` 动态生成目标时间: <syntaxhighlight lang="python"> def _calculate_target_time(**context): execution_date = context['execution_date'] return execution_date.add(hours=3).time() TimeSensor( task_id='dynamic_time', target_time=_calculate_target_time, ) </syntaxhighlight> == 实际案例 == === 场景:跨系统时间同步 === 某金融系统需在交易所开盘(09:30 EST)后启动数据处理流程: <mermaid> graph LR A[TimeSensor: 09:30 EST] --> B[下载市场数据] B --> C[数据清洗] </mermaid> === 代码实现 === <syntaxhighlight lang="python"> from pytz import timezone est = timezone('US/Eastern') open_time = time(9, 30, tzinfo=est) TimeSensor( task_id='wait_market_open', target_time=open_time, timeout=3600 # 超时1小时 ) </syntaxhighlight> == 常见问题 == === Q1: 与TimeDeltaSensor的区别? === * `TimeSensor` 监测'''绝对时间'''(如"每天14:00") * `TimeDeltaSensor` 监测'''相对时间'''(如"等待2小时后") === Q2: 如何避免无限等待? === 通过设置 `timeout` 参数: <syntaxhighlight lang="python"> TimeSensor( task_id='safe_wait', target_time=time(23, 59), timeout=300 # 5分钟后超时失败 ) </syntaxhighlight> == 数学原理 == 传感器通过循环检查满足时间条件: <math> \begin{cases} \text{continue} & \text{if } T_{now} \geq T_{target}\\ \text{wait } \Delta t & \text{otherwise} \end{cases} </math> 其中 <math>\Delta t</math> 为 `poke_interval`。 == 性能优化 == * 对于高频检查(如每分钟),建议减小 `poke_interval` * 长期等待任务应考虑使用 `TriggerDagRunOperator` 替代 [[Category:大数据框架]] [[Category:Airflow]] [[Category:Airflow Sensors应用]]
摘要:
请注意,所有对代码酷的贡献均被视为依照知识共享署名-非商业性使用-相同方式共享发表(详情请见
代码酷:著作权
)。如果您不希望您的文字作品被随意编辑和分发传播,请不要在此提交。
您同时也向我们承诺,您提交的内容为您自己所创作,或是复制自公共领域或类似自由来源。
未经许可,请勿提交受著作权保护的作品!
取消
编辑帮助
(在新窗口中打开)