Airflow与GCS交互
Airflow与GCS交互[编辑 | 编辑源代码]
介绍[编辑 | 编辑源代码]
Apache Airflow 是一个开源的工作流编排工具,常用于调度和监控复杂的数据管道。Google Cloud Storage (GCS) 是 Google Cloud Platform (GCP) 提供的对象存储服务,广泛用于存储和管理大规模数据。Airflow 与 GCS 的交互使得用户能够自动化地从 GCS 读取数据、写入数据或触发基于存储事件的工作流。
本指南将介绍如何在 Airflow 中集成 GCS,包括配置连接、使用 Operators 和 Hooks,以及实际应用案例。
配置 GCS 连接[编辑 | 编辑源代码]
在 Airflow 中与 GCS 交互,首先需要配置一个 GCP 连接。以下步骤说明如何通过 Airflow UI 设置连接:
1. 登录 Airflow Web UI。 2. 导航至 Admin > Connections。 3. 点击 Add a new record。 4. 填写以下字段:
* Connection ID: 例如 `gcs_conn` * Connection Type: 选择 `Google Cloud` * Project ID: 你的 GCP 项目 ID * Keyfile JSON: 上传或粘贴服务账户的 JSON 密钥文件
使用 GCS Operators 和 Hooks[编辑 | 编辑源代码]
Airflow 提供了多种方式与 GCS 交互,包括 Operators(用于任务执行)和 Hooks(用于底层交互)。
1. GCSToBigQueryOperator[编辑 | 编辑源代码]
此 Operator 用于将 GCS 中的数据加载到 BigQuery 表中。
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
load_gcs_to_bq = GCSToBigQueryOperator(
task_id='gcs_to_bq',
bucket='my-gcs-bucket',
source_objects=['data/file.csv'],
destination_project_dataset_table='my_project.my_dataset.my_table',
schema_fields=[
{'name': 'id', 'type': 'INTEGER'},
{'name': 'name', 'type': 'STRING'},
],
write_disposition='WRITE_TRUNCATE',
google_cloud_storage_conn_id='gcs_conn',
dag=dag
)
2. GCSHook[编辑 | 编辑源代码]
GCSHook 提供了直接与 GCS 交互的方法,例如上传、下载或列出文件。
from airflow.providers.google.cloud.hooks.gcs import GCSHook
def upload_to_gcs(bucket, object_name, local_file):
hook = GCSHook(gcp_conn_id='gcs_conn')
hook.upload(bucket_name=bucket, object_name=object_name, filename=local_file)
upload_to_gcs('my-gcs-bucket', 'data/uploaded_file.txt', '/local/path/file.txt')
实际案例[编辑 | 编辑源代码]
场景:自动化数据管道[编辑 | 编辑源代码]
假设我们需要每天从 GCS 读取 CSV 文件,处理后写入另一个 GCS 路径,并触发 BigQuery 加载。
1. **DAG 定义**:
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
from datetime import datetime
def process_file(bucket, source, destination):
# 模拟数据处理逻辑
print(f"Processing {source} and saving to {destination}")
with DAG('gcs_data_pipeline', start_date=datetime(2023, 1, 1), schedule_interval='@daily') as dag:
process_task = PythonOperator(
task_id='process_file',
python_callable=process_file,
op_kwargs={'bucket': 'my-gcs-bucket', 'source': 'raw/data.csv', 'destination': 'processed/data_clean.csv'},
)
load_task = GCSToBigQueryOperator(
task_id='load_to_bq',
bucket='my-gcs-bucket',
source_objects=['processed/data_clean.csv'],
destination_project_dataset_table='my_project.my_dataset.processed_table',
schema_fields=[...],
write_disposition='WRITE_TRUNCATE',
google_cloud_storage_conn_id='gcs_conn',
)
process_task >> load_task
输出说明[编辑 | 编辑源代码]
- `process_task` 模拟文件处理逻辑。
- `load_task` 将处理后的文件加载到 BigQuery。
高级主题:使用 GCS 触发 Airflow DAG[编辑 | 编辑源代码]
可以通过 GCS 的 Cloud Storage Pub/Sub 通知 触发 Airflow DAG,实现事件驱动的管道。
1. 在 GCS 中配置 Pub/Sub 通知。 2. 使用 `GCSPrefixSensor` 或 `GCSObjectExistenceSensor` 监听文件变化。
from airflow.sensors.gcs import GCSObjectExistenceSensor
file_sensor = GCSObjectExistenceSensor(
task_id='check_file_exists',
bucket='my-gcs-bucket',
object='data/new_file.csv',
google_cloud_conn_id='gcs_conn',
dag=dag
)
总结[编辑 | 编辑源代码]
Airflow 与 GCS 的集成为数据工程师提供了强大的工具,用于构建自动化、可扩展的数据管道。通过 Operators 和 Hooks,用户可以轻松实现文件传输、事件触发和与 BigQuery 的集成。