跳转到内容

Airflow与BigQuery交互

来自代码酷

Airflow与BigQuery交互[编辑 | 编辑源代码]

介绍[编辑 | 编辑源代码]

Apache Airflow 是一个开源的工作流自动化工具,用于编排复杂的数据管道。Google BigQuery 是Google Cloud提供的全托管数据仓库服务,支持大规模数据分析。通过将Airflow与BigQuery集成,用户可以自动化执行SQL查询、加载数据、导出结果等操作,实现端到端的数据处理流程。

本指南将详细介绍如何在Airflow中通过官方提供的BigQuery Operator和Hook与BigQuery交互,涵盖基础配置、任务定义、错误处理及实际案例。

前置条件[编辑 | 编辑源代码]

在开始之前,请确保满足以下条件:

  • 已安装并配置Apache Airflow(建议版本≥2.0)。
  • 拥有Google Cloud账号,并启用BigQuery API。
  • 配置了Google Cloud服务账号密钥(JSON格式),并授予BigQuery相关权限(如`bigquery.admin`)。

配置Airflow连接[编辑 | 编辑源代码]

在Airflow中,需先创建与BigQuery的连接: 1. 登录Airflow Web UI,进入AdminConnections。 2. 点击Add a new record,填写以下参数:

  * Connection ID: `google_cloud_default`(或其他自定义名称)  
  * Connection Type: `Google Cloud`  
  * Keyfile JSON: 粘贴服务账号密钥的JSON内容  
  
# 也可以通过CLI创建连接(示例)  
airflow connections add google_cloud_default \  
    --conn-type google_cloud_platform \  
    --conn-extra '{"extra__google_cloud_platform__keyfile_json": "PASTE_JSON_HERE"}'

核心操作[编辑 | 编辑源代码]

1. 执行SQL查询[编辑 | 编辑源代码]

使用`BigQueryExecuteQueryOperator`运行查询并将结果存储到临时表或目标表。

  
from airflow.providers.google.cloud.operators.bigquery import BigQueryExecuteQueryOperator  

run_query = BigQueryExecuteQueryOperator(  
    task_id="run_query",  
    sql="SELECT COUNT(*) FROM `project.dataset.table`",  
    use_legacy_sql=False,  
    destination_dataset_table="project.dataset.results_table",  
    write_disposition="WRITE_TRUNCATE",  # 覆盖现有数据  
    gcp_conn_id="google_cloud_default"  
)

2. 数据加载与导出[编辑 | 编辑源代码]

  • **从GCS加载数据到BigQuery**:
  
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator  

load_data = GCSToBigQueryOperator(  
    task_id="load_data",  
    bucket="my-bucket",  
    source_objects=["data/file.csv"],  
    destination_project_dataset_table="project.dataset.target_table",  
    schema_fields=[{"name": "id", "type": "INT64"}, {"name": "name", "type": "STRING"}],  
    skip_leading_rows=1,  
    write_disposition="WRITE_TRUNCATE",  
    gcp_conn_id="google_cloud_default"  
)
  • **导出BigQuery数据到GCS**:
  
from airflow.providers.google.cloud.transfers.bigquery_to_gcs import BigQueryToGCSOperator  

export_data = BigQueryToGCSOperator(  
    task_id="export_data",  
    source_project_dataset_table="project.dataset.source_table",  
    destination_cloud_storage_uris=["gs://my-bucket/export/data.csv"],  
    export_format="CSV",  
    gcp_conn_id="google_cloud_default"  
)

3. 使用BigQuery Hook[编辑 | 编辑源代码]

如需更灵活的操作,可直接调用`BigQueryHook`:

  
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook  

def query_and_save():  
    hook = BigQueryHook(gcp_conn_id="google_cloud_default")  
    records = hook.get_records(sql="SELECT * FROM `project.dataset.table` LIMIT 10")  
    print(f"Fetched {len(records)} rows")  

custom_task = PythonOperator(task_id="custom_query", python_callable=query_and_save)

实际案例[编辑 | 编辑源代码]

场景:每日销售报表自动化[编辑 | 编辑源代码]

1. **从GCS加载原始销售数据**到BigQuery临时表。 2. **执行聚合查询**生成日报表。 3. **导出报表**到GCS供下游系统使用。

graph TD A[Start] --> B[Load CSV from GCS] B --> C[Run Aggregation Query] C --> D[Export Report to GCS] D --> E[End]

对应DAG代码:

  
with DAG("daily_sales_report", schedule_interval="@daily") as dag:  
    load_data = GCSToBigQueryOperator(task_id="load_sales_data", ...)  
    run_query = BigQueryExecuteQueryOperator(  
        task_id="generate_report",  
        sql="""  
            SELECT  
                date,  
                SUM(revenue) as total_revenue  
            FROM `project.temp.sales`  
            GROUP BY date  
        """  
    )  
    export_report = BigQueryToGCSOperator(task_id="export_report", ...)  

    load_data >> run_query >> export_report

高级配置[编辑 | 编辑源代码]

错误处理与重试[编辑 | 编辑源代码]

通过Airflow的`retries`和`retry_delay`参数实现自动重试:

  
run_query = BigQueryExecuteQueryOperator(  
    task_id="run_query",  
    sql="...",  
    retries=3,  
    retry_delay=timedelta(minutes=5),  
    ...  
)

动态SQL生成[编辑 | 编辑源代码]

使用Jinja模板传递参数:

  
run_query = BigQueryExecuteQueryOperator(  
    task_id="run_query",  
    sql="SELECT * FROM `{{ params.table }}` WHERE date = '{{ ds }}'",  
    params={"table": "project.dataset.sales"},  
    ...  
)

常见问题[编辑 | 编辑源代码]

  • 权限不足:确保服务账号有`bigquery.tables.create`和`bigquery.jobs.create`权限。
  • SQL语法错误:设置`use_legacy_sql=False`以使用标准SQL。
  • 超时问题:调整`execution_timeout`参数(如`datetime.timedelta(minutes=30)`)。

总结[编辑 | 编辑源代码]

通过Airflow与BigQuery集成,用户能够高效编排数据管道,实现从数据加载、转换到分发的全流程自动化。结合Airflow的调度能力和BigQuery的弹性计算,可轻松应对TB级数据处理需求。