跳转到内容

Airflow与GCP集成

来自代码酷

Airflow与GCP集成[编辑 | 编辑源代码]

Apache Airflow 是一个开源的工作流自动化工具,用于编排复杂的数据管道。通过与 Google Cloud Platform (GCP) 集成,用户可以充分利用 GCP 提供的计算、存储和分析服务,构建可扩展的数据处理流程。本指南将详细介绍如何将 Airflow 与 GCP 集成,包括认证、服务调用和实际应用案例。

介绍[编辑 | 编辑源代码]

Airflow 与 GCP 的集成允许用户:

  • 在 GCP 上运行任务(如 Cloud Composer、Cloud Functions、BigQuery 等)。
  • 利用 GCP 的身份验证和授权机制(如 IAM 和服务账户)。
  • 动态调度和管理跨 GCP 服务的任务。

GCP 提供了多种与 Airflow 集成的选项,包括:

  • Cloud Composer:GCP 托管的 Airflow 服务。
  • GCP Operators:Airflow 提供的专用 Operator,用于与 GCP 服务交互。
  • 服务账户认证:确保 Airflow 安全访问 GCP 资源。

认证与配置[编辑 | 编辑源代码]

要与 GCP 集成,Airflow 需要配置服务账户和认证密钥。以下是步骤:

1. 创建 GCP 服务账户[编辑 | 编辑源代码]

在 GCP 控制台创建服务账户,并授予适当的权限(如 `roles/composer.worker` 或 `roles/bigquery.admin`)。

2. 生成密钥文件[编辑 | 编辑源代码]

生成 JSON 格式的服务账户密钥,并下载到 Airflow 服务器。

3. 配置 Airflow 连接[编辑 | 编辑源代码]

在 Airflow 的 `Connections` 界面中,添加 GCP 连接:

  • **Connection ID**:`google_cloud_default`
  • **Connection Type**:`Google Cloud`
  • **Keyfile Path**:指向服务账户密钥文件的路径。

或通过环境变量设置:

export GOOGLE_APPLICATION_CREDENTIALS="/path/to/keyfile.json"

GCP Operators 示例[编辑 | 编辑源代码]

Airflow 提供了多个 GCP 专用的 Operator,以下是一些常用示例:

1. BigQueryOperator[编辑 | 编辑源代码]

用于在 BigQuery 上执行 SQL 查询:

from airflow.providers.google.cloud.operators.bigquery import BigQueryOperator

run_query = BigQueryOperator(
    task_id="run_bigquery_query",
    sql="SELECT * FROM `project.dataset.table` LIMIT 1000;",
    destination_dataset_table="project.dataset.results_table",
    write_disposition="WRITE_TRUNCATE",
    gcp_conn_id="google_cloud_default",
    dag=dag,
)

2. GCSOperator[编辑 | 编辑源代码]

用于与 Google Cloud Storage 交互:

from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator

load_csv = GCSToBigQueryOperator(
    task_id="gcs_to_bigquery",
    bucket="my-bucket",
    source_objects=["data/file.csv"],
    destination_project_dataset_table="project.dataset.table",
    schema_fields=[
        {"name": "id", "type": "INTEGER"},
        {"name": "name", "type": "STRING"},
    ],
    write_disposition="WRITE_TRUNCATE",
    gcp_conn_id="google_cloud_default",
    dag=dag,
)

实际案例:数据处理管道[编辑 | 编辑源代码]

以下是一个完整的 Airflow DAG 示例,展示如何从 GCS 加载数据到 BigQuery,并进行转换:

from airflow import DAG
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
from airflow.providers.google.cloud.operators.bigquery import BigQueryOperator
from datetime import datetime

default_args = {
    "owner": "airflow",
    "start_date": datetime(2023, 1, 1),
}

with DAG(
    "gcp_data_pipeline",
    default_args=default_args,
    schedule_interval="@daily",
) as dag:
    load_data = GCSToBigQueryOperator(
        task_id="load_data",
        bucket="my-bucket",
        source_objects=["sales_data/*.csv"],
        destination_project_dataset_table="project.dataset.sales_raw",
        schema_fields=[
            {"name": "date", "type": "DATE"},
            {"name": "product", "type": "STRING"},
            {"name": "revenue", "type": "FLOAT"},
        ],
        write_disposition="WRITE_TRUNCATE",
    )

    transform_data = BigQueryOperator(
        task_id="transform_data",
        sql="""
        SELECT 
            date,
            product,
            SUM(revenue) as total_revenue
        FROM `project.dataset.sales_raw`
        GROUP BY date, product
        """,
        destination_dataset_table="project.dataset.sales_summary",
        write_disposition="WRITE_TRUNCATE",
    )

    load_data >> transform_data

使用 Cloud Composer[编辑 | 编辑源代码]

Cloud Composer 是 GCP 托管的 Airflow 服务,简化了部署和管理:

  • 自动配置 GCP 连接。
  • 内置与 GCP 服务的集成。
  • 可扩展的计算资源。

部署示例[编辑 | 编辑源代码]

通过 `gcloud` 命令创建 Cloud Composer 环境:

gcloud composer environments create my-composer-env \
    --location us-central1 \
    --image-version composer-2.0.11-airflow-2.2.3

总结[编辑 | 编辑源代码]

Airflow 与 GCP 的集成提供了强大的工作流编排能力,适用于数据管道、ETL 处理和自动化任务。通过 GCP Operators 和 Cloud Composer,用户可以高效地构建和管理复杂的数据处理流程。