跳转到内容

Airflow S3KeySensor

来自代码酷

Airflow S3KeySensor[编辑 | 编辑源代码]

介绍[编辑 | 编辑源代码]

S3KeySensor 是 Apache Airflow 中的一个传感器(Sensor),用于监控 Amazon S3(Simple Storage Service)存储桶中是否存在指定的文件(或键)。传感器是 Airflow 的一种特殊运算符,它会持续检查某个条件是否满足,直到条件为真或达到超时限制。S3KeySensor 常用于数据管道中,确保下游任务只在所需文件就绪时执行。

工作原理[编辑 | 编辑源代码]

S3KeySensor 通过 AWS API 定期检查 S3 存储桶中是否存在指定的键(文件路径)。如果文件存在,传感器任务标记为成功,后续任务开始执行;如果文件不存在,传感器会等待并重试,直到文件出现或达到最大重试次数。

graph TD A[开始] --> B[S3KeySensor 检查文件是否存在] B -- 文件存在 --> C[标记为成功,触发下游任务] B -- 文件不存在 --> D[等待并重试] D --> B

参数说明[编辑 | 编辑源代码]

以下是 S3KeySensor 的主要参数:

  • bucket_name(必需):要监控的 S3 存储桶名称。
  • bucket_key(必需):要检查的文件键(路径)。
  • aws_conn_id:AWS 连接的 ID,默认为 'aws_default'。
  • wildcard_match:是否支持通配符匹配(如 `*.csv`)。
  • timeout:传感器等待的最大秒数。
  • poke_interval:两次检查之间的间隔时间(秒)。

代码示例[编辑 | 编辑源代码]

以下是一个使用 S3KeySensor 的完整示例:

from airflow import DAG
from airflow.providers.amazon.aws.sensors.s3_key import S3KeySensor
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 1, 1),
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    's3_file_sensor_example',
    default_args=default_args,
    schedule_interval=timedelta(days=1),
) as dag:

    wait_for_file = S3KeySensor(
        task_id='wait_for_s3_file',
        bucket_name='my-data-bucket',
        bucket_key='incoming/data_*.csv',
        wildcard_match=True,
        aws_conn_id='aws_default',
        timeout=3600,
        poke_interval=60,
    )

    # 下游任务可以在这里定义
    # process_data = PythonOperator(...)

示例说明[编辑 | 编辑源代码]

1. 传感器会检查 `my-data-bucket` 存储桶中是否存在匹配 `incoming/data_*.csv` 的文件。 2. 使用通配符(`*`)匹配多个文件。 3. 每次检查间隔 60 秒(`poke_interval`)。 4. 如果 3600 秒(1 小时)内未找到文件,任务失败。

实际应用场景[编辑 | 编辑源代码]

场景:数据管道中的文件依赖 假设一个数据分析管道需要每天处理上传到 S3 的 CSV 文件。文件由上游系统上传,时间不固定。使用 S3KeySensor 可以确保处理任务只在文件就绪时运行。

graph LR A[上游系统] -->|上传文件| B[S3 存储桶] B --> C[S3KeySensor 检测文件] C --> D[数据处理任务]

高级配置[编辑 | 编辑源代码]

使用 AWS 连接[编辑 | 编辑源代码]

确保在 Airflow 中配置了 AWS 连接(`aws_default`)。可以通过 UI(Admin -> Connections)或 CLI 设置:

airflow connections add aws_default \
    --conn-type aws \
    --conn-extra '{"aws_access_key_id": "YOUR_KEY", "aws_secret_access_key": "YOUR_SECRET"}'

动态键名[编辑 | 编辑源代码]

如果需要动态生成键名(如基于日期),可以使用 Airflow 的模板变量:

bucket_key='incoming/data_{{ ds_nodash }}.csv',

常见问题[编辑 | 编辑源代码]

Q: 传感器一直等待,但文件已存在?

  • 检查 AWS 权限(传感器需要 `s3:ListBucket` 和 `s3:GetObject` 权限)。
  • 确认键名是否正确(区分大小写)。

Q: 如何减少 AWS API 调用?

  • 增加 `poke_interval`(如 300 秒)。
  • 避免使用通配符(`wildcard_match=False`),因为通配符会列出所有文件。

数学基础[编辑 | 编辑源代码]

传感器的重试逻辑可以用以下公式描述: 解析失败 (语法错误): {\displaystyle \text{总等待时间} = \min(\text{timeout}, \text{retries} \times \text{retry\_delay}) }

总结[编辑 | 编辑源代码]

S3KeySensor 是 Airflow 中监控 S3 文件的强大工具,特别适合异步数据管道。通过合理配置超时和重试策略,可以构建健壮的文件驱动工作流。