Airflow S3KeySensor
Airflow S3KeySensor[编辑 | 编辑源代码]
介绍[编辑 | 编辑源代码]
S3KeySensor 是 Apache Airflow 中的一个传感器(Sensor),用于监控 Amazon S3(Simple Storage Service)存储桶中是否存在指定的文件(或键)。传感器是 Airflow 的一种特殊运算符,它会持续检查某个条件是否满足,直到条件为真或达到超时限制。S3KeySensor 常用于数据管道中,确保下游任务只在所需文件就绪时执行。
工作原理[编辑 | 编辑源代码]
S3KeySensor 通过 AWS API 定期检查 S3 存储桶中是否存在指定的键(文件路径)。如果文件存在,传感器任务标记为成功,后续任务开始执行;如果文件不存在,传感器会等待并重试,直到文件出现或达到最大重试次数。
参数说明[编辑 | 编辑源代码]
以下是 S3KeySensor 的主要参数:
- bucket_name(必需):要监控的 S3 存储桶名称。
- bucket_key(必需):要检查的文件键(路径)。
- aws_conn_id:AWS 连接的 ID,默认为 'aws_default'。
- wildcard_match:是否支持通配符匹配(如 `*.csv`)。
- timeout:传感器等待的最大秒数。
- poke_interval:两次检查之间的间隔时间(秒)。
代码示例[编辑 | 编辑源代码]
以下是一个使用 S3KeySensor 的完整示例:
from airflow import DAG
from airflow.providers.amazon.aws.sensors.s3_key import S3KeySensor
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 1, 1),
'retries': 3,
'retry_delay': timedelta(minutes=5),
}
with DAG(
's3_file_sensor_example',
default_args=default_args,
schedule_interval=timedelta(days=1),
) as dag:
wait_for_file = S3KeySensor(
task_id='wait_for_s3_file',
bucket_name='my-data-bucket',
bucket_key='incoming/data_*.csv',
wildcard_match=True,
aws_conn_id='aws_default',
timeout=3600,
poke_interval=60,
)
# 下游任务可以在这里定义
# process_data = PythonOperator(...)
示例说明[编辑 | 编辑源代码]
1. 传感器会检查 `my-data-bucket` 存储桶中是否存在匹配 `incoming/data_*.csv` 的文件。 2. 使用通配符(`*`)匹配多个文件。 3. 每次检查间隔 60 秒(`poke_interval`)。 4. 如果 3600 秒(1 小时)内未找到文件,任务失败。
实际应用场景[编辑 | 编辑源代码]
场景:数据管道中的文件依赖 假设一个数据分析管道需要每天处理上传到 S3 的 CSV 文件。文件由上游系统上传,时间不固定。使用 S3KeySensor 可以确保处理任务只在文件就绪时运行。
高级配置[编辑 | 编辑源代码]
使用 AWS 连接[编辑 | 编辑源代码]
确保在 Airflow 中配置了 AWS 连接(`aws_default`)。可以通过 UI(Admin -> Connections)或 CLI 设置:
airflow connections add aws_default \
--conn-type aws \
--conn-extra '{"aws_access_key_id": "YOUR_KEY", "aws_secret_access_key": "YOUR_SECRET"}'
动态键名[编辑 | 编辑源代码]
如果需要动态生成键名(如基于日期),可以使用 Airflow 的模板变量:
bucket_key='incoming/data_{{ ds_nodash }}.csv',
常见问题[编辑 | 编辑源代码]
Q: 传感器一直等待,但文件已存在?
- 检查 AWS 权限(传感器需要 `s3:ListBucket` 和 `s3:GetObject` 权限)。
- 确认键名是否正确(区分大小写)。
Q: 如何减少 AWS API 调用?
- 增加 `poke_interval`(如 300 秒)。
- 避免使用通配符(`wildcard_match=False`),因为通配符会列出所有文件。
数学基础[编辑 | 编辑源代码]
传感器的重试逻辑可以用以下公式描述: 解析失败 (语法错误): {\displaystyle \text{总等待时间} = \min(\text{timeout}, \text{retries} \times \text{retry\_delay}) }
总结[编辑 | 编辑源代码]
S3KeySensor 是 Airflow 中监控 S3 文件的强大工具,特别适合异步数据管道。通过合理配置超时和重试策略,可以构建健壮的文件驱动工作流。