跳转到内容

Airflow自定义Sensor

来自代码酷

Airflow自定义Sensor[编辑 | 编辑源代码]

介绍[编辑 | 编辑源代码]

Airflow自定义Sensor是Apache Airflow中用于扩展监控能力的核心组件,允许用户根据业务需求创建专用的等待条件。Sensor继承自BaseSensorOperator,通过定期检查外部系统状态(如文件生成、API响应或数据库记录)来触发下游任务执行。与内置Sensor相比,自定义Sensor提供了更高的灵活性和业务适配性。

核心原理[编辑 | 编辑源代码]

自定义Sensor需实现以下关键方法:

  • poke(self, context):执行状态检查的逻辑单元,返回布尔值表示条件是否满足
  • mode:设置检查模式(pokereschedule
  • timeout/poke_interval:控制检查频率和超时行为

classDiagram BaseSensorOperator <|-- CustomSensor class BaseSensorOperator { +poke(context) +execute(context) +mode +timeout } class CustomSensor { +poke(context): bool +soft_fail: bool }

创建自定义Sensor[编辑 | 编辑源代码]

以下示例实现检查HDFS文件是否存在的Sensor:

from airflow.sensors.base import BaseSensorOperator
from airflow.providers.apache.hdfs.hooks.hdfs import HDFSHook

class HdfsFileSensor(BaseSensorOperator):
    """
    自定义HDFS文件检查Sensor
    :param hdfs_conn_id: HDFS连接ID
    :param filepath: 要检查的文件路径
    """
    template_fields = ('filepath',)

    def __init__(self, hdfs_conn_id='hdfs_default', filepath=None, **kwargs):
        super().__init__(**kwargs)
        self.hdfs_conn_id = hdfs_conn_id
        self.filepath = filepath

    def poke(self, context):
        hook = HDFSHook(self.hdfs_conn_id)
        client = hook.get_conn()
        self.log.info(f'Checking HDFS path: {self.filepath}')
        return client.status(self.filepath, strict=False) is not None

参数说明:

  • template_fields:支持宏替换的字段
  • strict=False:文件不存在时不抛出异常

使用案例[编辑 | 编辑源代码]

在DAG中调用自定义Sensor:

with DAG('custom_sensor_demo', schedule_interval='@daily') as dag:
    wait_for_data = HdfsFileSensor(
        task_id='wait_for_hdfs_file',
        filepath='/data/{{ ds }}/input.csv',
        mode='reschedule',
        timeout=3600,
        poke_interval=300
    )

    process_data = PythonOperator(
        task_id='process_data',
        python_callable=data_processing_func
    )

    wait_for_data >> process_data

行为说明: 1. 每5分钟检查一次HDFS文件 2. 若1小时内未检测到文件,任务失败 3. 使用reschedule模式释放工作线程资源

高级配置[编辑 | 编辑源代码]

指数退避检查[编辑 | 编辑源代码]

通过继承Sensor实现智能检查间隔:

import math

class SmartSensor(BaseSensorOperator):
    def __init__(self, initial_interval=60, max_interval=600, **kwargs):
        super().__init__(**kwargs)
        self.initial_interval = initial_interval
        self.max_interval = max_interval
        self.current_attempt = 0

    def poke(self, context):
        self.current_attempt += 1
        interval = min(
            self.initial_interval * math.pow(1.5, self.current_attempt),
            self.max_interval
        )
        self.poke_interval = interval
        return self._check_condition(context)

异步Sensor[编辑 | 编辑源代码]

对于长时间检查操作,可使用异步模式:

from airflow.triggers.base import BaseTrigger

class AsyncFileTrigger(BaseTrigger):
    def __init__(self, filepath, poll_interval=60):
        self.filepath = filepath
        self.poll_interval = poll_interval

    async def run(self):
        while True:
            if os.path.exists(self.filepath):
                yield TriggerEvent(True)
            await asyncio.sleep(self.poll_interval)

class AsyncFileSensor(BaseSensorOperator):
    def execute(self, context):
        self.defer(
            trigger=AsyncFileTrigger(filepath=self.filepath),
            method_name='execute_complete'
        )

    def execute_complete(self, context, event=None):
        if event:
            return True
        return False

最佳实践[编辑 | 编辑源代码]

1. 超时设置:根据业务SLA合理配置timeout 2. 资源优化:优先使用mode='reschedule' 3. 幂等设计:确保多次poke检查结果一致 4. 日志记录:在poke()中添加详细状态日志 5. 测试策略:使用Airflow的测试命令验证Sensor行为:

   airflow tasks test your_dag_id your_sensor_task_id execution_date

性能考量[编辑 | 编辑源代码]

Sensor性能可通过以下公式估算最大检查次数: N=Tt 其中:

  • T = timeout (秒)
  • t = poke_interval (秒)

常见问题[编辑 | 编辑源代码]

问题 解决方案
Sensor卡住不超时 检查mode是否为'poke'且工作线程可用
资源占用过高 改用reschedule模式或增加poke_interval
时区问题 确保所有时间参数使用UTC

扩展阅读[编辑 | 编辑源代码]

  • 继承关系:BaseOperator → BaseSensorOperator → CustomSensor
  • 内置Sensor参考:FileSensor、HttpSensor、SqlSensor
  • 高级模式:使用Triggerer实现完全异步Sensor