跳转到内容
主菜单
主菜单
移至侧栏
隐藏
导航
首页
最近更改
随机页面
MediaWiki帮助
代码酷
搜索
搜索
中文(中国大陆)
外观
创建账号
登录
个人工具
创建账号
登录
未登录编辑者的页面
了解详情
贡献
讨论
编辑“︁
Airflow FileSensor
”︁(章节)
页面
讨论
大陆简体
阅读
编辑
编辑源代码
查看历史
工具
工具
移至侧栏
隐藏
操作
阅读
编辑
编辑源代码
查看历史
常规
链入页面
相关更改
特殊页面
页面信息
外观
移至侧栏
隐藏
您的更改会在有权核准的用户核准后向读者展示。
警告:
您没有登录。如果您进行任何编辑,您的IP地址会公开展示。如果您
登录
或
创建账号
,您的编辑会以您的用户名署名,此外还有其他益处。
反垃圾检查。
不要
加入这个!
= Airflow FileSensor = '''FileSensor''' 是 Apache Airflow 中的一个传感器(Sensor)类,用于监控文件系统中的文件是否存在。传感器是 Airflow 的一种特殊运算符(Operator),它们会持续检查某个条件是否满足,直到条件为真或超时为止。FileSensor 特别适用于需要等待外部系统生成文件的场景,例如数据管道中的文件依赖。 == 基本概念 == FileSensor 继承自基类 <code>BaseSensorOperator</code>,其主要功能是定期检查指定路径下的文件是否存在。如果文件存在,则任务成功;如果超时仍未检测到文件,则任务失败。 === 核心参数 === FileSensor 的主要参数包括: * <code>filepath</code>:要监控的文件路径(可以是绝对路径或相对路径)。 * <code>fs_conn_id</code>(可选):文件系统连接的 ID,用于访问远程存储(如 S3、HDFS 等)。如果未提供,则使用本地文件系统。 * <code>poke_interval</code>:两次检查之间的间隔时间(默认 60 秒)。 * <code>timeout</code>:传感器等待文件的最大时间(默认 7 天)。 * <code>mode</code>:传感器的运行模式,可以是 <code>poke</code>(默认)或 <code>reschedule</code>。 == 代码示例 == 以下是一个简单的 FileSensor 示例,监控本地文件系统中的文件: <syntaxhighlight lang="python"> from airflow import DAG from airflow.sensors.filesystem import FileSensor from datetime import datetime with DAG( dag_id="file_sensor_example", start_date=datetime(2023, 1, 1), schedule_interval="@daily", ) as dag: wait_for_file = FileSensor( task_id="wait_for_file", filepath="/data/input_file.csv", # 监控的文件路径 poke_interval=30, # 每 30 秒检查一次 timeout=3600, # 超时时间为 1 小时 ) </syntaxhighlight> === 输出说明 === * 如果文件 <code>/data/input_file.csv</code> 在 1 小时内出现,任务标记为成功。 * 如果超时仍未检测到文件,任务标记为失败。 == 实际应用场景 == FileSensor 在以下场景中非常有用: 1. '''数据管道依赖''':等待上游系统生成输入文件后再启动数据处理任务。 2. '''批处理作业''':监控某个目录下的文件,一旦所有输入文件就绪,触发后续任务。 3. '''跨系统集成''':例如等待云存储(如 S3)中的文件上传完成。 === 远程文件系统示例 === 如果需要监控远程存储(如 Amazon S3),可以通过 <code>fs_conn_id</code> 指定连接: <syntaxhighlight lang="python"> wait_for_s3_file = FileSensor( task_id="wait_for_s3_file", filepath="s3://my-bucket/input_data.json", fs_conn_id="aws_conn", # 预先配置的 AWS 连接 poke_interval=60, ) </syntaxhighlight> == 高级配置 == === 使用通配符 === FileSensor 支持通配符(如 <code>*</code> 和 <code>?</code>)来匹配多个文件: <syntaxhighlight lang="python"> wait_for_multiple_files = FileSensor( task_id="wait_for_logs", filepath="/logs/app_*.log", # 匹配所有以 app_ 开头且以 .log 结尾的文件 ) </syntaxhighlight> === 自定义检查逻辑 === 可以通过继承 <code>FileSensor</code> 并重写 <code>poke()</code> 方法来实现自定义逻辑: <syntaxhighlight lang="python"> from airflow.sensors.filesystem import FileSensor class CustomFileSensor(FileSensor): def poke(self, context): # 自定义检查逻辑,例如检查文件大小是否大于 0 filepath = self.get_filepath(context) return os.path.exists(filepath) and os.path.getsize(filepath) > 0 </syntaxhighlight> == 性能优化 == * '''调整 <code>poke_interval</code>''':根据文件生成的预期频率设置合理的检查间隔,避免频繁检查浪费资源。 * '''使用 <code>mode='reschedule'</code>''':在长时间等待的场景下,释放工作器资源: <syntaxhighlight lang="python"> FileSensor( task_id="optimized_sensor", filepath="/data/large_file.bin", mode="reschedule", # 检查间隔期间释放工作器插槽 ) </syntaxhighlight> == 常见问题 == === 文件存在但传感器未触发 === 可能原因: 1. 文件权限问题:确保 Airflow 工作进程有权限访问该文件。 2. 路径错误:检查是否使用了绝对路径。 === 超时时间设置 === 对于关键任务,设置足够长的 <code>timeout</code>;对于非关键任务,可以缩短超时时间以避免资源浪费。 == 总结 == FileSensor 是 Airflow 中监控文件系统的强大工具,特别适合需要文件依赖的自动化工作流。通过合理配置参数和优化检查逻辑,可以高效地实现文件触发的数据管道。 <mermaid> graph LR A[上游系统] -->|生成文件| B(FileSensor) B -->|文件存在| C[下游任务] B -->|超时| D[报警或重试] </mermaid> [[Category:大数据框架]] [[Category:Airflow]] [[Category:Airflow Sensors应用]]
摘要:
请注意,所有对代码酷的贡献均被视为依照知识共享署名-非商业性使用-相同方式共享发表(详情请见
代码酷:著作权
)。如果您不希望您的文字作品被随意编辑和分发传播,请不要在此提交。
您同时也向我们承诺,您提交的内容为您自己所创作,或是复制自公共领域或类似自由来源。
未经许可,请勿提交受著作权保护的作品!
取消
编辑帮助
(在新窗口中打开)