跳转到内容
主菜单
主菜单
移至侧栏
隐藏
导航
首页
最近更改
随机页面
MediaWiki帮助
代码酷
搜索
搜索
中文(中国大陆)
外观
创建账号
登录
个人工具
创建账号
登录
未登录编辑者的页面
了解详情
贡献
讨论
编辑“︁
Airflow S3KeySensor
”︁(章节)
页面
讨论
大陆简体
阅读
编辑
编辑源代码
查看历史
工具
工具
移至侧栏
隐藏
操作
阅读
编辑
编辑源代码
查看历史
常规
链入页面
相关更改
特殊页面
页面信息
外观
移至侧栏
隐藏
您的更改会在有权核准的用户核准后向读者展示。
警告:
您没有登录。如果您进行任何编辑,您的IP地址会公开展示。如果您
登录
或
创建账号
,您的编辑会以您的用户名署名,此外还有其他益处。
反垃圾检查。
不要
加入这个!
= Airflow S3KeySensor = == 介绍 == '''S3KeySensor''' 是 Apache Airflow 中的一个传感器(Sensor),用于监控 Amazon S3(Simple Storage Service)存储桶中是否存在指定的文件(或键)。传感器是 Airflow 的一种特殊运算符,它会持续检查某个条件是否满足,直到条件为真或达到超时限制。S3KeySensor 常用于数据管道中,确保下游任务只在所需文件就绪时执行。 == 工作原理 == S3KeySensor 通过 AWS API 定期检查 S3 存储桶中是否存在指定的键(文件路径)。如果文件存在,传感器任务标记为成功,后续任务开始执行;如果文件不存在,传感器会等待并重试,直到文件出现或达到最大重试次数。 <mermaid> graph TD A[开始] --> B[S3KeySensor 检查文件是否存在] B -- 文件存在 --> C[标记为成功,触发下游任务] B -- 文件不存在 --> D[等待并重试] D --> B </mermaid> == 参数说明 == 以下是 S3KeySensor 的主要参数: * '''bucket_name'''(必需):要监控的 S3 存储桶名称。 * '''bucket_key'''(必需):要检查的文件键(路径)。 * '''aws_conn_id''':AWS 连接的 ID,默认为 'aws_default'。 * '''wildcard_match''':是否支持通配符匹配(如 `*.csv`)。 * '''timeout''':传感器等待的最大秒数。 * '''poke_interval''':两次检查之间的间隔时间(秒)。 == 代码示例 == 以下是一个使用 S3KeySensor 的完整示例: <syntaxhighlight lang="python"> from airflow import DAG from airflow.providers.amazon.aws.sensors.s3_key import S3KeySensor from datetime import datetime, timedelta default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2023, 1, 1), 'retries': 3, 'retry_delay': timedelta(minutes=5), } with DAG( 's3_file_sensor_example', default_args=default_args, schedule_interval=timedelta(days=1), ) as dag: wait_for_file = S3KeySensor( task_id='wait_for_s3_file', bucket_name='my-data-bucket', bucket_key='incoming/data_*.csv', wildcard_match=True, aws_conn_id='aws_default', timeout=3600, poke_interval=60, ) # 下游任务可以在这里定义 # process_data = PythonOperator(...) </syntaxhighlight> === 示例说明 === 1. 传感器会检查 `my-data-bucket` 存储桶中是否存在匹配 `incoming/data_*.csv` 的文件。 2. 使用通配符(`*`)匹配多个文件。 3. 每次检查间隔 60 秒(`poke_interval`)。 4. 如果 3600 秒(1 小时)内未找到文件,任务失败。 == 实际应用场景 == '''场景:数据管道中的文件依赖''' 假设一个数据分析管道需要每天处理上传到 S3 的 CSV 文件。文件由上游系统上传,时间不固定。使用 S3KeySensor 可以确保处理任务只在文件就绪时运行。 <mermaid> graph LR A[上游系统] -->|上传文件| B[S3 存储桶] B --> C[S3KeySensor 检测文件] C --> D[数据处理任务] </mermaid> == 高级配置 == === 使用 AWS 连接 === 确保在 Airflow 中配置了 AWS 连接(`aws_default`)。可以通过 UI(Admin -> Connections)或 CLI 设置: <syntaxhighlight lang="bash"> airflow connections add aws_default \ --conn-type aws \ --conn-extra '{"aws_access_key_id": "YOUR_KEY", "aws_secret_access_key": "YOUR_SECRET"}' </syntaxhighlight> === 动态键名 === 如果需要动态生成键名(如基于日期),可以使用 Airflow 的模板变量: <syntaxhighlight lang="python"> bucket_key='incoming/data_{{ ds_nodash }}.csv', </syntaxhighlight> == 常见问题 == '''Q: 传感器一直等待,但文件已存在?''' * 检查 AWS 权限(传感器需要 `s3:ListBucket` 和 `s3:GetObject` 权限)。 * 确认键名是否正确(区分大小写)。 '''Q: 如何减少 AWS API 调用?''' * 增加 `poke_interval`(如 300 秒)。 * 避免使用通配符(`wildcard_match=False`),因为通配符会列出所有文件。 == 数学基础 == 传感器的重试逻辑可以用以下公式描述: <math> \text{总等待时间} = \min(\text{timeout}, \text{retries} \times \text{retry\_delay}) </math> == 总结 == S3KeySensor 是 Airflow 中监控 S3 文件的强大工具,特别适合异步数据管道。通过合理配置超时和重试策略,可以构建健壮的文件驱动工作流。 [[Category:大数据框架]] [[Category:Airflow]] [[Category:Airflow Sensors应用]]
摘要:
请注意,所有对代码酷的贡献均被视为依照知识共享署名-非商业性使用-相同方式共享发表(详情请见
代码酷:著作权
)。如果您不希望您的文字作品被随意编辑和分发传播,请不要在此提交。
您同时也向我们承诺,您提交的内容为您自己所创作,或是复制自公共领域或类似自由来源。
未经许可,请勿提交受著作权保护的作品!
取消
编辑帮助
(在新窗口中打开)