跳转到内容

Airflow与GCS交互

来自代码酷

Airflow与GCS交互[编辑 | 编辑源代码]

介绍[编辑 | 编辑源代码]

Apache Airflow 是一个开源的工作流编排工具,常用于调度和监控复杂的数据管道。Google Cloud Storage (GCS) 是 Google Cloud Platform (GCP) 提供的对象存储服务,广泛用于存储和管理大规模数据。Airflow 与 GCS 的交互使得用户能够自动化地从 GCS 读取数据、写入数据或触发基于存储事件的工作流。

本指南将介绍如何在 Airflow 中集成 GCS,包括配置连接、使用 Operators 和 Hooks,以及实际应用案例。

配置 GCS 连接[编辑 | 编辑源代码]

在 Airflow 中与 GCS 交互,首先需要配置一个 GCP 连接。以下步骤说明如何通过 Airflow UI 设置连接:

1. 登录 Airflow Web UI。 2. 导航至 Admin > Connections。 3. 点击 Add a new record。 4. 填写以下字段:

  * Connection ID: 例如 `gcs_conn`  
  * Connection Type: 选择 `Google Cloud`  
  * Project ID: 你的 GCP 项目 ID  
  * Keyfile JSON: 上传或粘贴服务账户的 JSON 密钥文件  

使用 GCS Operators 和 Hooks[编辑 | 编辑源代码]

Airflow 提供了多种方式与 GCS 交互,包括 Operators(用于任务执行)和 Hooks(用于底层交互)。

1. GCSToBigQueryOperator[编辑 | 编辑源代码]

此 Operator 用于将 GCS 中的数据加载到 BigQuery 表中。

  
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator  

load_gcs_to_bq = GCSToBigQueryOperator(  
    task_id='gcs_to_bq',  
    bucket='my-gcs-bucket',  
    source_objects=['data/file.csv'],  
    destination_project_dataset_table='my_project.my_dataset.my_table',  
    schema_fields=[  
        {'name': 'id', 'type': 'INTEGER'},  
        {'name': 'name', 'type': 'STRING'},  
    ],  
    write_disposition='WRITE_TRUNCATE',  
    google_cloud_storage_conn_id='gcs_conn',  
    dag=dag  
)

2. GCSHook[编辑 | 编辑源代码]

GCSHook 提供了直接与 GCS 交互的方法,例如上传、下载或列出文件。

  
from airflow.providers.google.cloud.hooks.gcs import GCSHook  

def upload_to_gcs(bucket, object_name, local_file):  
    hook = GCSHook(gcp_conn_id='gcs_conn')  
    hook.upload(bucket_name=bucket, object_name=object_name, filename=local_file)  

upload_to_gcs('my-gcs-bucket', 'data/uploaded_file.txt', '/local/path/file.txt')

实际案例[编辑 | 编辑源代码]

场景:自动化数据管道[编辑 | 编辑源代码]

假设我们需要每天从 GCS 读取 CSV 文件,处理后写入另一个 GCS 路径,并触发 BigQuery 加载。

1. **DAG 定义**:

  
from airflow import DAG  
from airflow.operators.python import PythonOperator  
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator  
from datetime import datetime  

def process_file(bucket, source, destination):  
    # 模拟数据处理逻辑  
    print(f"Processing {source} and saving to {destination}")  

with DAG('gcs_data_pipeline', start_date=datetime(2023, 1, 1), schedule_interval='@daily') as dag:  
    process_task = PythonOperator(  
        task_id='process_file',  
        python_callable=process_file,  
        op_kwargs={'bucket': 'my-gcs-bucket', 'source': 'raw/data.csv', 'destination': 'processed/data_clean.csv'},  
    )  

    load_task = GCSToBigQueryOperator(  
        task_id='load_to_bq',  
        bucket='my-gcs-bucket',  
        source_objects=['processed/data_clean.csv'],  
        destination_project_dataset_table='my_project.my_dataset.processed_table',  
        schema_fields=[...],  
        write_disposition='WRITE_TRUNCATE',  
        google_cloud_storage_conn_id='gcs_conn',  
    )  

    process_task >> load_task

输出说明[编辑 | 编辑源代码]

  • `process_task` 模拟文件处理逻辑。
  • `load_task` 将处理后的文件加载到 BigQuery。

高级主题:使用 GCS 触发 Airflow DAG[编辑 | 编辑源代码]

可以通过 GCS 的 Cloud Storage Pub/Sub 通知 触发 Airflow DAG,实现事件驱动的管道。

1. 在 GCS 中配置 Pub/Sub 通知。 2. 使用 `GCSPrefixSensor` 或 `GCSObjectExistenceSensor` 监听文件变化。

  
from airflow.sensors.gcs import GCSObjectExistenceSensor  

file_sensor = GCSObjectExistenceSensor(  
    task_id='check_file_exists',  
    bucket='my-gcs-bucket',  
    object='data/new_file.csv',  
    google_cloud_conn_id='gcs_conn',  
    dag=dag  
)

总结[编辑 | 编辑源代码]

Airflow 与 GCS 的集成为数据工程师提供了强大的工具,用于构建自动化、可扩展的数据管道。通过 Operators 和 Hooks,用户可以轻松实现文件传输、事件触发和与 BigQuery 的集成。

参见[编辑 | 编辑源代码]

graph LR A[GCS Bucket] -->|File Upload| B(Airflow DAG) B --> C[Process Data] C --> D[Load to BigQuery]