Airflow自定义Sensor
外观
Airflow自定义Sensor[编辑 | 编辑源代码]
介绍[编辑 | 编辑源代码]
Airflow自定义Sensor是Apache Airflow中用于扩展监控能力的核心组件,允许用户根据业务需求创建专用的等待条件。Sensor继承自BaseSensorOperator,通过定期检查外部系统状态(如文件生成、API响应或数据库记录)来触发下游任务执行。与内置Sensor相比,自定义Sensor提供了更高的灵活性和业务适配性。
核心原理[编辑 | 编辑源代码]
自定义Sensor需实现以下关键方法:
poke(self, context)
:执行状态检查的逻辑单元,返回布尔值表示条件是否满足mode
:设置检查模式(poke
或reschedule
)timeout
/poke_interval
:控制检查频率和超时行为
创建自定义Sensor[编辑 | 编辑源代码]
以下示例实现检查HDFS文件是否存在的Sensor:
from airflow.sensors.base import BaseSensorOperator
from airflow.providers.apache.hdfs.hooks.hdfs import HDFSHook
class HdfsFileSensor(BaseSensorOperator):
"""
自定义HDFS文件检查Sensor
:param hdfs_conn_id: HDFS连接ID
:param filepath: 要检查的文件路径
"""
template_fields = ('filepath',)
def __init__(self, hdfs_conn_id='hdfs_default', filepath=None, **kwargs):
super().__init__(**kwargs)
self.hdfs_conn_id = hdfs_conn_id
self.filepath = filepath
def poke(self, context):
hook = HDFSHook(self.hdfs_conn_id)
client = hook.get_conn()
self.log.info(f'Checking HDFS path: {self.filepath}')
return client.status(self.filepath, strict=False) is not None
参数说明:
template_fields
:支持宏替换的字段strict=False
:文件不存在时不抛出异常
使用案例[编辑 | 编辑源代码]
在DAG中调用自定义Sensor:
with DAG('custom_sensor_demo', schedule_interval='@daily') as dag:
wait_for_data = HdfsFileSensor(
task_id='wait_for_hdfs_file',
filepath='/data/{{ ds }}/input.csv',
mode='reschedule',
timeout=3600,
poke_interval=300
)
process_data = PythonOperator(
task_id='process_data',
python_callable=data_processing_func
)
wait_for_data >> process_data
行为说明:
1. 每5分钟检查一次HDFS文件
2. 若1小时内未检测到文件,任务失败
3. 使用reschedule
模式释放工作线程资源
高级配置[编辑 | 编辑源代码]
指数退避检查[编辑 | 编辑源代码]
通过继承Sensor
实现智能检查间隔:
import math
class SmartSensor(BaseSensorOperator):
def __init__(self, initial_interval=60, max_interval=600, **kwargs):
super().__init__(**kwargs)
self.initial_interval = initial_interval
self.max_interval = max_interval
self.current_attempt = 0
def poke(self, context):
self.current_attempt += 1
interval = min(
self.initial_interval * math.pow(1.5, self.current_attempt),
self.max_interval
)
self.poke_interval = interval
return self._check_condition(context)
异步Sensor[编辑 | 编辑源代码]
对于长时间检查操作,可使用异步模式:
from airflow.triggers.base import BaseTrigger
class AsyncFileTrigger(BaseTrigger):
def __init__(self, filepath, poll_interval=60):
self.filepath = filepath
self.poll_interval = poll_interval
async def run(self):
while True:
if os.path.exists(self.filepath):
yield TriggerEvent(True)
await asyncio.sleep(self.poll_interval)
class AsyncFileSensor(BaseSensorOperator):
def execute(self, context):
self.defer(
trigger=AsyncFileTrigger(filepath=self.filepath),
method_name='execute_complete'
)
def execute_complete(self, context, event=None):
if event:
return True
return False
最佳实践[编辑 | 编辑源代码]
1. 超时设置:根据业务SLA合理配置timeout
2. 资源优化:优先使用mode='reschedule'
3. 幂等设计:确保多次poke检查结果一致
4. 日志记录:在poke()中添加详细状态日志
5. 测试策略:使用Airflow的测试命令验证Sensor行为:
airflow tasks test your_dag_id your_sensor_task_id execution_date
性能考量[编辑 | 编辑源代码]
Sensor性能可通过以下公式估算最大检查次数: 其中:
- = timeout (秒)
- = poke_interval (秒)
常见问题[编辑 | 编辑源代码]
问题 | 解决方案 |
---|---|
Sensor卡住不超时 | 检查mode是否为'poke'且工作线程可用 |
资源占用过高 | 改用reschedule模式或增加poke_interval |
时区问题 | 确保所有时间参数使用UTC |
扩展阅读[编辑 | 编辑源代码]
- 继承关系:BaseOperator → BaseSensorOperator → CustomSensor
- 内置Sensor参考:FileSensor、HttpSensor、SqlSensor
- 高级模式:使用Triggerer实现完全异步Sensor