Airflow Sensors基础
Airflow Sensors基础[编辑 | 编辑源代码]
介绍[编辑 | 编辑源代码]
Airflow Sensors 是 Apache Airflow 中的一种特殊运算符(Operator),用于监控外部系统或条件,并在满足特定条件时触发任务执行。与常规运算符不同,Sensors 会持续检查条件(例如文件是否存在、数据库记录是否更新等),直到条件为真或超时。Sensors 是构建依赖外部事件的自动化工作流(DAG)的关键组件。
核心特性[编辑 | 编辑源代码]
- 阻塞模式:Sensor 会暂停当前任务,直到条件满足。
- 超时机制:可设置最长等待时间(`timeout`)和检查间隔(`poke_interval`)。
- 可扩展性:支持自定义 Sensor 以满足特定需求。
常用内置 Sensors[编辑 | 编辑源代码]
以下是 Airflow 提供的内置 Sensors 示例:
Sensor 名称 | 功能描述 | 参数示例 |
---|---|---|
监控文件或目录是否存在 | `filepath="/data/file.txt"` | ||
检查 SQL 查询结果是否为真 | `sql="SELECT COUNT(*) FROM table"` | ||
验证 HTTP 端点是否返回预期响应 | `endpoint="api/status"` | ||
等待其他 DAG 的任务完成 | `external_dag_id="target_dag"` |
代码示例[编辑 | 编辑源代码]
以下是一个使用 `FileSensor` 的完整 DAG 示例:
from airflow import DAG
from airflow.sensors.filesystem import FileSensor
from airflow.operators.dummy import DummyOperator
from datetime import datetime
with DAG(
dag_id="file_sensor_example",
start_date=datetime(2023, 1, 1),
schedule_interval="@daily",
) as dag:
start_task = DummyOperator(task_id="start")
# 监控文件是否出现,每30秒检查一次,超时1小时
wait_for_file = FileSensor(
task_id="wait_for_file",
filepath="/data/input.csv",
poke_interval=30,
timeout=3600,
mode="poke" # 默认模式,持续检查
)
process_file = DummyOperator(task_id="process_file")
start_task >> wait_for_file >> process_file
输出说明:
- 若文件在1小时内出现 → 触发 `process_file` 任务
- 若超时未检测到文件 → 任务标记为`失败`
模式选择[编辑 | 编辑源代码]
Sensors 支持两种运行模式: 1. poke模式(默认):持续检查,占用工作线程资源 2. reschedule模式:释放工作线程,下次检查时重新调度
自定义 Sensor 开发[编辑 | 编辑源代码]
通过继承 `BaseSensorOperator` 实现自定义逻辑:
from airflow.sensors.base import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
class CustomTableExistsSensor(BaseSensorOperator):
@apply_defaults
def __init__(self, table_name, **kwargs):
super().__init__(**kwargs)
self.table_name = table_name
def poke(self, context):
# 实现检查逻辑
return check_table_exists(self.table_name) # 返回布尔值
最佳实践[编辑 | 编辑源代码]
1. 设置合理的超时:避免无限等待 2. 使用 reschedule 模式:减少资源占用(适合长时间间隔检查) 3. 错误处理:通过 `soft_fail=True` 使超时标记为`跳过`而非`失败` 4. 测试策略:使用 `airflow tasks test` 命令本地验证
数学表达[编辑 | 编辑源代码]
Sensor 的检查次数计算公式: 其中:
- = 最大检查次数
- = 总超时时间(秒)
- = 检查间隔(秒)
实际案例[编辑 | 编辑源代码]
场景:ETL 管道依赖上游系统生成数据文件 解决方案: 1. 使用 `FileSensor` 监控 `/data/inputs/` 目录 2. 文件到达后触发 Spark 作业处理数据 3. 通过 `ExternalTaskSensor` 等待后续质量检查 DAG 完成
常见问题[编辑 | 编辑源代码]
Q: Sensor 卡住不退出怎么办? A: 检查: - 是否正确实现 `poke()` 方法(自定义 Sensor) - 超时设置是否足够 - 外部系统是否可达
Q: 如何提高监控精度? A: 减小 `poke_interval`,但需权衡系统负载