跳转到内容

Airflow Sensor模式

来自代码酷

Airflow Sensor模式[编辑 | 编辑源代码]

介绍[编辑 | 编辑源代码]

Airflow Sensor模式是Apache Airflow中一种特殊的操作符(Operator),用于持续监测外部系统状态,直到满足特定条件后才触发后续任务。传感器(Sensor)通过轮询或异步机制检查目标状态(如文件是否存在、数据库记录是否更新等),常用于工作流中对依赖条件的监控。与普通Operator不同,Sensor会阻塞任务执行直到条件满足或超时。

Sensor的核心特性包括:

  • 阻塞性:任务停留在运行状态,直到条件满足。
  • 可配置超时:避免无限等待,通过`timeout`参数控制最长等待时间。
  • 多种模式:包括硬性等待(poke模式)和延迟重试(reschedule模式)。

工作原理[编辑 | 编辑源代码]

Sensor通过以下两种模式运行:

1. Poke模式(默认)[编辑 | 编辑源代码]

传感器在同一个工作进程中持续轮询(默认间隔60秒),占用任务槽(slot)直到条件满足。

graph TD A[开始] --> B{条件满足?} B -- 否 --> C[等待poke_interval] C --> B B -- 是 --> D[触发下游任务]

2. Reschedule模式[编辑 | 编辑源代码]

传感器每次检查后释放任务槽,并在下次检查时重新调度。适合长时间等待的场景,减少资源占用。

graph TD A[开始] --> B{条件满足?} B -- 否 --> C[释放任务槽并计划下次检查] C --> D[等待mode.reschedule] D --> B B -- 是 --> E[触发下游任务]

代码示例[编辑 | 编辑源代码]

以下示例展示如何定义一个监测文件是否存在的`FileSensor`:

  
from airflow import DAG  
from airflow.sensors.filesystem import FileSensor  
from datetime import datetime  

with DAG('file_sensor_example', start_date=datetime(2023, 1, 1)) as dag:  
    wait_for_file = FileSensor(  
        task_id='wait_for_file',  
        filepath='/data/example.txt',  
        mode='poke',  # 或 'reschedule'  
        poke_interval=30,  # 检查间隔(秒)  
        timeout=3600,  # 超时时间(秒)  
    )

输出说明

  • 若文件在1小时内出现,任务标记为`success`并触发下游任务。
  • 若超时未满足条件,任务标记为`failed`。

实际应用场景[编辑 | 编辑源代码]

场景1:数据管道依赖检查[编辑 | 编辑源代码]

监测上游系统是否生成数据文件,确保ETL任务仅在数据就绪时执行。

  
from airflow.sensors.external_task import ExternalTaskSensor  

wait_for_upstream = ExternalTaskSensor(  
    task_id='wait_for_upstream',  
    external_dag_id='upstream_dag',  
    external_task_id='load_data',  
    mode='reschedule',  
    timeout=86400  # 最长等待24小时  
)

场景2:API响应监测[编辑 | 编辑源代码]

使用`HttpSensor`检查API服务是否可用:

  
from airflow.sensors.http_sensor import HttpSensor  

check_api = HttpSensor(  
    task_id='check_api',  
    endpoint='/health',  
    response_check=lambda response: response.status_code == 200,  
    mode='poke',  
    poke_interval=10  
)

高级配置[编辑 | 编辑源代码]

自定义Sensor[编辑 | 编辑源代码]

继承`BaseSensorOperator`实现自定义逻辑:

  
from airflow.sensors.base import BaseSensorOperator  

class CustomSensor(BaseSensorOperator):  
    def poke(self, context):  
        return check_custom_condition()  # 返回True/False  

custom_sensor = CustomSensor(task_id='custom_sensor')

超时与指数退避[编辑 | 编辑源代码]

通过`exponential_backoff`参数优化轮询效率:

解析失败 (语法错误): {\displaystyle \text{wait\_time} = \text{poke\_interval} \times 2^{(\text{retry\_count} - 1)} }

  
FileSensor(  
    ...,  
    exponential_backoff=True,  # 启用指数退避  
    poke_interval=10  
)

最佳实践[编辑 | 编辑源代码]

  • 短时检查(<5分钟)使用`poke`模式。
  • 长时等待(>5分钟)使用`reschedule`模式以释放资源。
  • 为`timeout`设置合理值,避免任务卡死。
  • 在Sensor任务添加描述(`doc_md`)说明检查条件。

常见问题[编辑 | 编辑源代码]

Q: Sensor卡住不退出怎么办? A: 检查`timeout`是否过小,或条件逻辑是否永远无法满足。

Q: 如何减少Sensor资源消耗? A: 增大`poke_interval`或切换至`reschedule`模式。