跳转到内容

Airflow FileSensor

来自代码酷

Airflow FileSensor[编辑 | 编辑源代码]

FileSensor 是 Apache Airflow 中的一个传感器(Sensor)类,用于监控文件系统中的文件是否存在。传感器是 Airflow 的一种特殊运算符(Operator),它们会持续检查某个条件是否满足,直到条件为真或超时为止。FileSensor 特别适用于需要等待外部系统生成文件的场景,例如数据管道中的文件依赖。

基本概念[编辑 | 编辑源代码]

FileSensor 继承自基类 BaseSensorOperator,其主要功能是定期检查指定路径下的文件是否存在。如果文件存在,则任务成功;如果超时仍未检测到文件,则任务失败。

核心参数[编辑 | 编辑源代码]

FileSensor 的主要参数包括:

  • filepath:要监控的文件路径(可以是绝对路径或相对路径)。
  • fs_conn_id(可选):文件系统连接的 ID,用于访问远程存储(如 S3、HDFS 等)。如果未提供,则使用本地文件系统。
  • poke_interval:两次检查之间的间隔时间(默认 60 秒)。
  • timeout:传感器等待文件的最大时间(默认 7 天)。
  • mode:传感器的运行模式,可以是 poke(默认)或 reschedule

代码示例[编辑 | 编辑源代码]

以下是一个简单的 FileSensor 示例,监控本地文件系统中的文件:

from airflow import DAG
from airflow.sensors.filesystem import FileSensor
from datetime import datetime

with DAG(
    dag_id="file_sensor_example",
    start_date=datetime(2023, 1, 1),
    schedule_interval="@daily",
) as dag:
    wait_for_file = FileSensor(
        task_id="wait_for_file",
        filepath="/data/input_file.csv",  # 监控的文件路径
        poke_interval=30,  # 每 30 秒检查一次
        timeout=3600,  # 超时时间为 1 小时
    )

输出说明[编辑 | 编辑源代码]

  • 如果文件 /data/input_file.csv 在 1 小时内出现,任务标记为成功。
  • 如果超时仍未检测到文件,任务标记为失败。

实际应用场景[编辑 | 编辑源代码]

FileSensor 在以下场景中非常有用:

1. 数据管道依赖:等待上游系统生成输入文件后再启动数据处理任务。 2. 批处理作业:监控某个目录下的文件,一旦所有输入文件就绪,触发后续任务。 3. 跨系统集成:例如等待云存储(如 S3)中的文件上传完成。

远程文件系统示例[编辑 | 编辑源代码]

如果需要监控远程存储(如 Amazon S3),可以通过 fs_conn_id 指定连接:

wait_for_s3_file = FileSensor(
    task_id="wait_for_s3_file",
    filepath="s3://my-bucket/input_data.json",
    fs_conn_id="aws_conn",  # 预先配置的 AWS 连接
    poke_interval=60,
)

高级配置[编辑 | 编辑源代码]

使用通配符[编辑 | 编辑源代码]

FileSensor 支持通配符(如 *?)来匹配多个文件:

wait_for_multiple_files = FileSensor(
    task_id="wait_for_logs",
    filepath="/logs/app_*.log",  # 匹配所有以 app_ 开头且以 .log 结尾的文件
)

自定义检查逻辑[编辑 | 编辑源代码]

可以通过继承 FileSensor 并重写 poke() 方法来实现自定义逻辑:

from airflow.sensors.filesystem import FileSensor

class CustomFileSensor(FileSensor):
    def poke(self, context):
        # 自定义检查逻辑,例如检查文件大小是否大于 0
        filepath = self.get_filepath(context)
        return os.path.exists(filepath) and os.path.getsize(filepath) > 0

性能优化[编辑 | 编辑源代码]

  • 调整 poke_interval:根据文件生成的预期频率设置合理的检查间隔,避免频繁检查浪费资源。
  • 使用 mode='reschedule':在长时间等待的场景下,释放工作器资源:
FileSensor(
    task_id="optimized_sensor",
    filepath="/data/large_file.bin",
    mode="reschedule",  # 检查间隔期间释放工作器插槽
)

常见问题[编辑 | 编辑源代码]

文件存在但传感器未触发[编辑 | 编辑源代码]

可能原因: 1. 文件权限问题:确保 Airflow 工作进程有权限访问该文件。 2. 路径错误:检查是否使用了绝对路径。

超时时间设置[编辑 | 编辑源代码]

对于关键任务,设置足够长的 timeout;对于非关键任务,可以缩短超时时间以避免资源浪费。

总结[编辑 | 编辑源代码]

FileSensor 是 Airflow 中监控文件系统的强大工具,特别适合需要文件依赖的自动化工作流。通过合理配置参数和优化检查逻辑,可以高效地实现文件触发的数据管道。

graph LR A[上游系统] -->|生成文件| B(FileSensor) B -->|文件存在| C[下游任务] B -->|超时| D[报警或重试]