跳转到内容

Airflow Sensors基础

来自代码酷

Airflow Sensors基础[编辑 | 编辑源代码]

介绍[编辑 | 编辑源代码]

Airflow Sensors 是 Apache Airflow 中的一种特殊运算符(Operator),用于监控外部系统或条件,并在满足特定条件时触发任务执行。与常规运算符不同,Sensors 会持续检查条件(例如文件是否存在、数据库记录是否更新等),直到条件为真或超时。Sensors 是构建依赖外部事件的自动化工作流(DAG)的关键组件。

核心特性[编辑 | 编辑源代码]

  • 阻塞模式:Sensor 会暂停当前任务,直到条件满足。
  • 超时机制:可设置最长等待时间(`timeout`)和检查间隔(`poke_interval`)。
  • 可扩展性:支持自定义 Sensor 以满足特定需求。

常用内置 Sensors[编辑 | 编辑源代码]

以下是 Airflow 提供的内置 Sensors 示例:

常用 Sensors 列表
Sensor 名称 功能描述 参数示例
监控文件或目录是否存在 | `filepath="/data/file.txt"`
检查 SQL 查询结果是否为真 | `sql="SELECT COUNT(*) FROM table"`
验证 HTTP 端点是否返回预期响应 | `endpoint="api/status"`
等待其他 DAG 的任务完成 | `external_dag_id="target_dag"`

代码示例[编辑 | 编辑源代码]

以下是一个使用 `FileSensor` 的完整 DAG 示例:

from airflow import DAG
from airflow.sensors.filesystem import FileSensor
from airflow.operators.dummy import DummyOperator
from datetime import datetime

with DAG(
    dag_id="file_sensor_example",
    start_date=datetime(2023, 1, 1),
    schedule_interval="@daily",
) as dag:
    
    start_task = DummyOperator(task_id="start")
    
    # 监控文件是否出现,每30秒检查一次,超时1小时
    wait_for_file = FileSensor(
        task_id="wait_for_file",
        filepath="/data/input.csv",
        poke_interval=30,
        timeout=3600,
        mode="poke"  # 默认模式,持续检查
    )
    
    process_file = DummyOperator(task_id="process_file")
    
    start_task >> wait_for_file >> process_file

输出说明

  • 若文件在1小时内出现 → 触发 `process_file` 任务
  • 若超时未检测到文件 → 任务标记为`失败`

模式选择[编辑 | 编辑源代码]

Sensors 支持两种运行模式: 1. poke模式(默认):持续检查,占用工作线程资源 2. reschedule模式:释放工作线程,下次检查时重新调度

graph LR A[开始] --> B{模式选择} B -->|poke| C[持续占用线程] B -->|reschedule| D[释放线程等待下次调度]

自定义 Sensor 开发[编辑 | 编辑源代码]

通过继承 `BaseSensorOperator` 实现自定义逻辑:

from airflow.sensors.base import BaseSensorOperator
from airflow.utils.decorators import apply_defaults

class CustomTableExistsSensor(BaseSensorOperator):
    @apply_defaults
    def __init__(self, table_name, **kwargs):
        super().__init__(**kwargs)
        self.table_name = table_name

    def poke(self, context):
        # 实现检查逻辑
        return check_table_exists(self.table_name)  # 返回布尔值

最佳实践[编辑 | 编辑源代码]

1. 设置合理的超时:避免无限等待 2. 使用 reschedule 模式:减少资源占用(适合长时间间隔检查) 3. 错误处理:通过 `soft_fail=True` 使超时标记为`跳过`而非`失败` 4. 测试策略:使用 `airflow tasks test` 命令本地验证

数学表达[编辑 | 编辑源代码]

Sensor 的检查次数计算公式: n=timeoutpoke_interval 其中:

  • n = 最大检查次数
  • timeout = 总超时时间(秒)
  • poke_interval = 检查间隔(秒)

实际案例[编辑 | 编辑源代码]

场景:ETL 管道依赖上游系统生成数据文件 解决方案: 1. 使用 `FileSensor` 监控 `/data/inputs/` 目录 2. 文件到达后触发 Spark 作业处理数据 3. 通过 `ExternalTaskSensor` 等待后续质量检查 DAG 完成

常见问题[编辑 | 编辑源代码]

Q: Sensor 卡住不退出怎么办? A: 检查: - 是否正确实现 `poke()` 方法(自定义 Sensor) - 超时设置是否足够 - 外部系统是否可达

Q: 如何提高监控精度? A: 减小 `poke_interval`,但需权衡系统负载