Airflow Sensor最佳实践
外观
Airflow Sensor最佳实践[编辑 | 编辑源代码]
介绍[编辑 | 编辑源代码]
Airflow Sensors是Apache Airflow中的一种特殊算子,用于监控外部系统状态并等待特定条件满足后触发下游任务。与常规Operator不同,Sensor会持续检查条件(通过轮询机制),直到超时或条件满足。本指南将详细介绍传感器的工作原理、核心参数配置和实际应用场景。
核心概念[编辑 | 编辑源代码]
基本工作原理[编辑 | 编辑源代码]
Sensor通过继承BaseSensorOperator
实现,其核心行为包括:
- 轮询间隔(
poke_interval
):默认60秒 - 超时时间(
timeout
):默认7天 - 执行模式:支持
poke
(默认)和reschedule
模式
关键参数对比[编辑 | 编辑源代码]
参数 | poke模式 | reschedule模式 |
---|---|---|
资源占用 | 持续占用Worker插槽 | 检查时占用 |
适用场景 | 快速完成的条件 | 长时间等待的条件 |
重试机制 | 需手动配置 | 自动释放资源 |
代码示例[编辑 | 编辑源代码]
基础文件传感器[编辑 | 编辑源代码]
from airflow.sensors.filesystem import FileSensor
file_sensor = FileSensor(
task_id="wait_for_data",
filepath="/data/input.csv",
poke_interval=30, # 每30秒检查一次
timeout=3600, # 超时1小时
mode="reschedule" # 资源优化模式
)
输出说明:
- 文件存在时:立即触发下游任务
- 1小时内未检测到文件:任务标记为失败
自定义传感器开发[编辑 | 编辑源代码]
from airflow.sensors.base import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
class DatabaseRecordSensor(BaseSensorOperator):
@apply_defaults
def __init__(self, conn_id, sql, *args, **kwargs):
super().__init__(*args, **kwargs)
self.conn_id = conn_id
self.sql = sql
def poke(self, context):
hook = PostgresHook(postgres_conn_id=self.conn_id)
records = hook.get_records(self.sql)
return len(records) > 0
最佳实践[编辑 | 编辑源代码]
性能优化[编辑 | 编辑源代码]
1. 合理设置超时:根据业务需求设置 2. 动态轮询:对高频检查使用短间隔(如5秒),低频检查使用长间隔(如300秒) 3. 模式选择:
* 短期等待(<30分钟):poke模式 * 长期等待:reschedule模式
错误处理[编辑 | 编辑源代码]
- 实现
soft_fail=True
将超时转为跳过而非失败 - 使用
on_retry_callback
记录检查历史 - 结合
retries
和retry_delay
实现重试机制
实际案例[编辑 | 编辑源代码]
数据管道场景[编辑 | 编辑源代码]
需求:当Hive分区出现后启动ETL作业
from airflow.providers.apache.hive.sensors.hive_partition import HivePartitionSensor
partition_sensor = HivePartitionSensor(
task_id="check_hive_data",
partition="ds={{ ds }}",
metastore_conn_id="hive_metastore",
poke_interval=5*60, # 5分钟检查一次
timeout=6*60*60 # 6小时超时
)
跨系统集成[编辑 | 编辑源代码]
需求:等待API服务就绪
常见问题[编辑 | 编辑源代码]
Q1: 传感器卡住工作队列怎么办?[编辑 | 编辑源代码]
- 解决方案:改用reschedule模式或增加
pool_slots
Q2: 如何测试传感器逻辑?[编辑 | 编辑源代码]
- 使用
airflow tasks test
命令模拟运行 - 在单元测试中mock
poke()
方法
Q3: 条件永远不满足时如何处理?[编辑 | 编辑源代码]
- 设置
timeout
和email_on_failure
- 结合
ShortCircuitOperator
实现条件分支
进阶技巧[编辑 | 编辑源代码]
- 使用
Sensor
+TriggerRule
实现复杂依赖 - 通过
XCom
传递传感器检测结果 - 在Kubernetes环境中使用
KubernetesPodOperator
+传感器混合部署