Airflow与GCP集成
Airflow与GCP集成[编辑 | 编辑源代码]
Apache Airflow 是一个开源的工作流自动化工具,用于编排复杂的数据管道。通过与 Google Cloud Platform (GCP) 集成,用户可以充分利用 GCP 提供的计算、存储和分析服务,构建可扩展的数据处理流程。本指南将详细介绍如何将 Airflow 与 GCP 集成,包括认证、服务调用和实际应用案例。
介绍[编辑 | 编辑源代码]
Airflow 与 GCP 的集成允许用户:
- 在 GCP 上运行任务(如 Cloud Composer、Cloud Functions、BigQuery 等)。
- 利用 GCP 的身份验证和授权机制(如 IAM 和服务账户)。
- 动态调度和管理跨 GCP 服务的任务。
GCP 提供了多种与 Airflow 集成的选项,包括:
- Cloud Composer:GCP 托管的 Airflow 服务。
- GCP Operators:Airflow 提供的专用 Operator,用于与 GCP 服务交互。
- 服务账户认证:确保 Airflow 安全访问 GCP 资源。
认证与配置[编辑 | 编辑源代码]
要与 GCP 集成,Airflow 需要配置服务账户和认证密钥。以下是步骤:
1. 创建 GCP 服务账户[编辑 | 编辑源代码]
在 GCP 控制台创建服务账户,并授予适当的权限(如 `roles/composer.worker` 或 `roles/bigquery.admin`)。
2. 生成密钥文件[编辑 | 编辑源代码]
生成 JSON 格式的服务账户密钥,并下载到 Airflow 服务器。
3. 配置 Airflow 连接[编辑 | 编辑源代码]
在 Airflow 的 `Connections` 界面中,添加 GCP 连接:
- **Connection ID**:`google_cloud_default`
- **Connection Type**:`Google Cloud`
- **Keyfile Path**:指向服务账户密钥文件的路径。
或通过环境变量设置:
export GOOGLE_APPLICATION_CREDENTIALS="/path/to/keyfile.json"
GCP Operators 示例[编辑 | 编辑源代码]
Airflow 提供了多个 GCP 专用的 Operator,以下是一些常用示例:
1. BigQueryOperator[编辑 | 编辑源代码]
用于在 BigQuery 上执行 SQL 查询:
from airflow.providers.google.cloud.operators.bigquery import BigQueryOperator
run_query = BigQueryOperator(
task_id="run_bigquery_query",
sql="SELECT * FROM `project.dataset.table` LIMIT 1000;",
destination_dataset_table="project.dataset.results_table",
write_disposition="WRITE_TRUNCATE",
gcp_conn_id="google_cloud_default",
dag=dag,
)
2. GCSOperator[编辑 | 编辑源代码]
用于与 Google Cloud Storage 交互:
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
load_csv = GCSToBigQueryOperator(
task_id="gcs_to_bigquery",
bucket="my-bucket",
source_objects=["data/file.csv"],
destination_project_dataset_table="project.dataset.table",
schema_fields=[
{"name": "id", "type": "INTEGER"},
{"name": "name", "type": "STRING"},
],
write_disposition="WRITE_TRUNCATE",
gcp_conn_id="google_cloud_default",
dag=dag,
)
实际案例:数据处理管道[编辑 | 编辑源代码]
以下是一个完整的 Airflow DAG 示例,展示如何从 GCS 加载数据到 BigQuery,并进行转换:
from airflow import DAG
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
from airflow.providers.google.cloud.operators.bigquery import BigQueryOperator
from datetime import datetime
default_args = {
"owner": "airflow",
"start_date": datetime(2023, 1, 1),
}
with DAG(
"gcp_data_pipeline",
default_args=default_args,
schedule_interval="@daily",
) as dag:
load_data = GCSToBigQueryOperator(
task_id="load_data",
bucket="my-bucket",
source_objects=["sales_data/*.csv"],
destination_project_dataset_table="project.dataset.sales_raw",
schema_fields=[
{"name": "date", "type": "DATE"},
{"name": "product", "type": "STRING"},
{"name": "revenue", "type": "FLOAT"},
],
write_disposition="WRITE_TRUNCATE",
)
transform_data = BigQueryOperator(
task_id="transform_data",
sql="""
SELECT
date,
product,
SUM(revenue) as total_revenue
FROM `project.dataset.sales_raw`
GROUP BY date, product
""",
destination_dataset_table="project.dataset.sales_summary",
write_disposition="WRITE_TRUNCATE",
)
load_data >> transform_data
使用 Cloud Composer[编辑 | 编辑源代码]
Cloud Composer 是 GCP 托管的 Airflow 服务,简化了部署和管理:
- 自动配置 GCP 连接。
- 内置与 GCP 服务的集成。
- 可扩展的计算资源。
部署示例[编辑 | 编辑源代码]
通过 `gcloud` 命令创建 Cloud Composer 环境:
gcloud composer environments create my-composer-env \
--location us-central1 \
--image-version composer-2.0.11-airflow-2.2.3
总结[编辑 | 编辑源代码]
Airflow 与 GCP 的集成提供了强大的工作流编排能力,适用于数据管道、ETL 处理和自动化任务。通过 GCP Operators 和 Cloud Composer,用户可以高效地构建和管理复杂的数据处理流程。