跳转到内容

Airflow Sensor最佳实践

来自代码酷

Airflow Sensor最佳实践[编辑 | 编辑源代码]

介绍[编辑 | 编辑源代码]

Airflow Sensors是Apache Airflow中的一种特殊算子,用于监控外部系统状态并等待特定条件满足后触发下游任务。与常规Operator不同,Sensor会持续检查条件(通过轮询机制),直到超时或条件满足。本指南将详细介绍传感器的工作原理、核心参数配置和实际应用场景。

核心概念[编辑 | 编辑源代码]

基本工作原理[编辑 | 编辑源代码]

Sensor通过继承BaseSensorOperator实现,其核心行为包括:

  • 轮询间隔poke_interval):默认60秒
  • 超时时间timeout):默认7天
  • 执行模式:支持poke(默认)和reschedule模式

graph LR A[开始] --> B{条件满足?} B -- 否 --> C[等待poke_interval] C --> B B -- 是 --> D[触发下游任务]

关键参数对比[编辑 | 编辑源代码]

传感器模式对比
参数 poke模式 reschedule模式
资源占用 持续占用Worker插槽 检查时占用
适用场景 快速完成的条件 长时间等待的条件
重试机制 需手动配置 自动释放资源

代码示例[编辑 | 编辑源代码]

基础文件传感器[编辑 | 编辑源代码]

from airflow.sensors.filesystem import FileSensor

file_sensor = FileSensor(
    task_id="wait_for_data",
    filepath="/data/input.csv",
    poke_interval=30,  # 每30秒检查一次
    timeout=3600,      # 超时1小时
    mode="reschedule"  # 资源优化模式
)

输出说明

  • 文件存在时:立即触发下游任务
  • 1小时内未检测到文件:任务标记为失败

自定义传感器开发[编辑 | 编辑源代码]

from airflow.sensors.base import BaseSensorOperator
from airflow.utils.decorators import apply_defaults

class DatabaseRecordSensor(BaseSensorOperator):
    @apply_defaults
    def __init__(self, conn_id, sql, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.conn_id = conn_id
        self.sql = sql

    def poke(self, context):
        hook = PostgresHook(postgres_conn_id=self.conn_id)
        records = hook.get_records(self.sql)
        return len(records) > 0

最佳实践[编辑 | 编辑源代码]

性能优化[编辑 | 编辑源代码]

1. 合理设置超时:根据业务需求设置timeout最大预期等待时间 2. 动态轮询:对高频检查使用短间隔(如5秒),低频检查使用长间隔(如300秒) 3. 模式选择

  * 短期等待(<30分钟):poke模式
  * 长期等待:reschedule模式

错误处理[编辑 | 编辑源代码]

  • 实现soft_fail=True将超时转为跳过而非失败
  • 使用on_retry_callback记录检查历史
  • 结合retriesretry_delay实现重试机制

实际案例[编辑 | 编辑源代码]

数据管道场景[编辑 | 编辑源代码]

需求:当Hive分区出现后启动ETL作业

from airflow.providers.apache.hive.sensors.hive_partition import HivePartitionSensor

partition_sensor = HivePartitionSensor(
    task_id="check_hive_data",
    partition="ds={{ ds }}",
    metastore_conn_id="hive_metastore",
    poke_interval=5*60,  # 5分钟检查一次
    timeout=6*60*60      # 6小时超时
)

跨系统集成[编辑 | 编辑源代码]

需求:等待API服务就绪

graph TD A[API Sensor] -->|服务UP| B[数据下载] B --> C[数据处理] C --> D[结果上传]

常见问题[编辑 | 编辑源代码]

Q1: 传感器卡住工作队列怎么办?[编辑 | 编辑源代码]

  • 解决方案:改用reschedule模式或增加pool_slots

Q2: 如何测试传感器逻辑?[编辑 | 编辑源代码]

  • 使用airflow tasks test命令模拟运行
  • 在单元测试中mockpoke()方法

Q3: 条件永远不满足时如何处理?[编辑 | 编辑源代码]

  • 设置timeoutemail_on_failure
  • 结合ShortCircuitOperator实现条件分支

进阶技巧[编辑 | 编辑源代码]

  • 使用Sensor+TriggerRule实现复杂依赖
  • 通过XCom传递传感器检测结果
  • 在Kubernetes环境中使用KubernetesPodOperator+传感器混合部署