Airflow Sensor模式
外观
Airflow Sensor模式[编辑 | 编辑源代码]
介绍[编辑 | 编辑源代码]
Airflow Sensor模式是Apache Airflow中一种特殊的操作符(Operator),用于持续监测外部系统状态,直到满足特定条件后才触发后续任务。传感器(Sensor)通过轮询或异步机制检查目标状态(如文件是否存在、数据库记录是否更新等),常用于工作流中对依赖条件的监控。与普通Operator不同,Sensor会阻塞任务执行直到条件满足或超时。
Sensor的核心特性包括:
- 阻塞性:任务停留在运行状态,直到条件满足。
- 可配置超时:避免无限等待,通过`timeout`参数控制最长等待时间。
- 多种模式:包括硬性等待(poke模式)和延迟重试(reschedule模式)。
工作原理[编辑 | 编辑源代码]
Sensor通过以下两种模式运行:
1. Poke模式(默认)[编辑 | 编辑源代码]
传感器在同一个工作进程中持续轮询(默认间隔60秒),占用任务槽(slot)直到条件满足。
2. Reschedule模式[编辑 | 编辑源代码]
传感器每次检查后释放任务槽,并在下次检查时重新调度。适合长时间等待的场景,减少资源占用。
代码示例[编辑 | 编辑源代码]
以下示例展示如何定义一个监测文件是否存在的`FileSensor`:
from airflow import DAG
from airflow.sensors.filesystem import FileSensor
from datetime import datetime
with DAG('file_sensor_example', start_date=datetime(2023, 1, 1)) as dag:
wait_for_file = FileSensor(
task_id='wait_for_file',
filepath='/data/example.txt',
mode='poke', # 或 'reschedule'
poke_interval=30, # 检查间隔(秒)
timeout=3600, # 超时时间(秒)
)
输出说明:
- 若文件在1小时内出现,任务标记为`success`并触发下游任务。
- 若超时未满足条件,任务标记为`failed`。
实际应用场景[编辑 | 编辑源代码]
场景1:数据管道依赖检查[编辑 | 编辑源代码]
监测上游系统是否生成数据文件,确保ETL任务仅在数据就绪时执行。
from airflow.sensors.external_task import ExternalTaskSensor
wait_for_upstream = ExternalTaskSensor(
task_id='wait_for_upstream',
external_dag_id='upstream_dag',
external_task_id='load_data',
mode='reschedule',
timeout=86400 # 最长等待24小时
)
场景2:API响应监测[编辑 | 编辑源代码]
使用`HttpSensor`检查API服务是否可用:
from airflow.sensors.http_sensor import HttpSensor
check_api = HttpSensor(
task_id='check_api',
endpoint='/health',
response_check=lambda response: response.status_code == 200,
mode='poke',
poke_interval=10
)
高级配置[编辑 | 编辑源代码]
自定义Sensor[编辑 | 编辑源代码]
继承`BaseSensorOperator`实现自定义逻辑:
from airflow.sensors.base import BaseSensorOperator
class CustomSensor(BaseSensorOperator):
def poke(self, context):
return check_custom_condition() # 返回True/False
custom_sensor = CustomSensor(task_id='custom_sensor')
超时与指数退避[编辑 | 编辑源代码]
通过`exponential_backoff`参数优化轮询效率:
解析失败 (语法错误): {\displaystyle \text{wait\_time} = \text{poke\_interval} \times 2^{(\text{retry\_count} - 1)} }
FileSensor(
...,
exponential_backoff=True, # 启用指数退避
poke_interval=10
)
最佳实践[编辑 | 编辑源代码]
- 短时检查(<5分钟)使用`poke`模式。
- 长时等待(>5分钟)使用`reschedule`模式以释放资源。
- 为`timeout`设置合理值,避免任务卡死。
- 在Sensor任务添加描述(`doc_md`)说明检查条件。
常见问题[编辑 | 编辑源代码]
Q: Sensor卡住不退出怎么办? A: 检查`timeout`是否过小,或条件逻辑是否永远无法满足。
Q: 如何减少Sensor资源消耗? A: 增大`poke_interval`或切换至`reschedule`模式。