Airflow FileSensor
Airflow FileSensor[编辑 | 编辑源代码]
FileSensor 是 Apache Airflow 中的一个传感器(Sensor)类,用于监控文件系统中的文件是否存在。传感器是 Airflow 的一种特殊运算符(Operator),它们会持续检查某个条件是否满足,直到条件为真或超时为止。FileSensor 特别适用于需要等待外部系统生成文件的场景,例如数据管道中的文件依赖。
基本概念[编辑 | 编辑源代码]
FileSensor 继承自基类 BaseSensorOperator
,其主要功能是定期检查指定路径下的文件是否存在。如果文件存在,则任务成功;如果超时仍未检测到文件,则任务失败。
核心参数[编辑 | 编辑源代码]
FileSensor 的主要参数包括:
filepath
:要监控的文件路径(可以是绝对路径或相对路径)。fs_conn_id
(可选):文件系统连接的 ID,用于访问远程存储(如 S3、HDFS 等)。如果未提供,则使用本地文件系统。poke_interval
:两次检查之间的间隔时间(默认 60 秒)。timeout
:传感器等待文件的最大时间(默认 7 天)。mode
:传感器的运行模式,可以是poke
(默认)或reschedule
。
代码示例[编辑 | 编辑源代码]
以下是一个简单的 FileSensor 示例,监控本地文件系统中的文件:
from airflow import DAG
from airflow.sensors.filesystem import FileSensor
from datetime import datetime
with DAG(
dag_id="file_sensor_example",
start_date=datetime(2023, 1, 1),
schedule_interval="@daily",
) as dag:
wait_for_file = FileSensor(
task_id="wait_for_file",
filepath="/data/input_file.csv", # 监控的文件路径
poke_interval=30, # 每 30 秒检查一次
timeout=3600, # 超时时间为 1 小时
)
输出说明[编辑 | 编辑源代码]
- 如果文件
/data/input_file.csv
在 1 小时内出现,任务标记为成功。 - 如果超时仍未检测到文件,任务标记为失败。
实际应用场景[编辑 | 编辑源代码]
FileSensor 在以下场景中非常有用:
1. 数据管道依赖:等待上游系统生成输入文件后再启动数据处理任务。 2. 批处理作业:监控某个目录下的文件,一旦所有输入文件就绪,触发后续任务。 3. 跨系统集成:例如等待云存储(如 S3)中的文件上传完成。
远程文件系统示例[编辑 | 编辑源代码]
如果需要监控远程存储(如 Amazon S3),可以通过 fs_conn_id
指定连接:
wait_for_s3_file = FileSensor(
task_id="wait_for_s3_file",
filepath="s3://my-bucket/input_data.json",
fs_conn_id="aws_conn", # 预先配置的 AWS 连接
poke_interval=60,
)
高级配置[编辑 | 编辑源代码]
使用通配符[编辑 | 编辑源代码]
FileSensor 支持通配符(如 *
和 ?
)来匹配多个文件:
wait_for_multiple_files = FileSensor(
task_id="wait_for_logs",
filepath="/logs/app_*.log", # 匹配所有以 app_ 开头且以 .log 结尾的文件
)
自定义检查逻辑[编辑 | 编辑源代码]
可以通过继承 FileSensor
并重写 poke()
方法来实现自定义逻辑:
from airflow.sensors.filesystem import FileSensor
class CustomFileSensor(FileSensor):
def poke(self, context):
# 自定义检查逻辑,例如检查文件大小是否大于 0
filepath = self.get_filepath(context)
return os.path.exists(filepath) and os.path.getsize(filepath) > 0
性能优化[编辑 | 编辑源代码]
- 调整
poke_interval
:根据文件生成的预期频率设置合理的检查间隔,避免频繁检查浪费资源。 - 使用
mode='reschedule'
:在长时间等待的场景下,释放工作器资源:
FileSensor(
task_id="optimized_sensor",
filepath="/data/large_file.bin",
mode="reschedule", # 检查间隔期间释放工作器插槽
)
常见问题[编辑 | 编辑源代码]
文件存在但传感器未触发[编辑 | 编辑源代码]
可能原因: 1. 文件权限问题:确保 Airflow 工作进程有权限访问该文件。 2. 路径错误:检查是否使用了绝对路径。
超时时间设置[编辑 | 编辑源代码]
对于关键任务,设置足够长的 timeout
;对于非关键任务,可以缩短超时时间以避免资源浪费。
总结[编辑 | 编辑源代码]
FileSensor 是 Airflow 中监控文件系统的强大工具,特别适合需要文件依赖的自动化工作流。通过合理配置参数和优化检查逻辑,可以高效地实现文件触发的数据管道。