Airflow与BigQuery交互
外观
Airflow与BigQuery交互[编辑 | 编辑源代码]
介绍[编辑 | 编辑源代码]
Apache Airflow 是一个开源的工作流自动化工具,用于编排复杂的数据管道。Google BigQuery 是Google Cloud提供的全托管数据仓库服务,支持大规模数据分析。通过将Airflow与BigQuery集成,用户可以自动化执行SQL查询、加载数据、导出结果等操作,实现端到端的数据处理流程。
本指南将详细介绍如何在Airflow中通过官方提供的BigQuery Operator和Hook与BigQuery交互,涵盖基础配置、任务定义、错误处理及实际案例。
前置条件[编辑 | 编辑源代码]
在开始之前,请确保满足以下条件:
- 已安装并配置Apache Airflow(建议版本≥2.0)。
- 拥有Google Cloud账号,并启用BigQuery API。
- 配置了Google Cloud服务账号密钥(JSON格式),并授予BigQuery相关权限(如`bigquery.admin`)。
配置Airflow连接[编辑 | 编辑源代码]
在Airflow中,需先创建与BigQuery的连接: 1. 登录Airflow Web UI,进入Admin → Connections。 2. 点击Add a new record,填写以下参数:
* Connection ID: `google_cloud_default`(或其他自定义名称) * Connection Type: `Google Cloud` * Keyfile JSON: 粘贴服务账号密钥的JSON内容
# 也可以通过CLI创建连接(示例)
airflow connections add google_cloud_default \
--conn-type google_cloud_platform \
--conn-extra '{"extra__google_cloud_platform__keyfile_json": "PASTE_JSON_HERE"}'
核心操作[编辑 | 编辑源代码]
1. 执行SQL查询[编辑 | 编辑源代码]
使用`BigQueryExecuteQueryOperator`运行查询并将结果存储到临时表或目标表。
from airflow.providers.google.cloud.operators.bigquery import BigQueryExecuteQueryOperator
run_query = BigQueryExecuteQueryOperator(
task_id="run_query",
sql="SELECT COUNT(*) FROM `project.dataset.table`",
use_legacy_sql=False,
destination_dataset_table="project.dataset.results_table",
write_disposition="WRITE_TRUNCATE", # 覆盖现有数据
gcp_conn_id="google_cloud_default"
)
2. 数据加载与导出[编辑 | 编辑源代码]
- **从GCS加载数据到BigQuery**:
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
load_data = GCSToBigQueryOperator(
task_id="load_data",
bucket="my-bucket",
source_objects=["data/file.csv"],
destination_project_dataset_table="project.dataset.target_table",
schema_fields=[{"name": "id", "type": "INT64"}, {"name": "name", "type": "STRING"}],
skip_leading_rows=1,
write_disposition="WRITE_TRUNCATE",
gcp_conn_id="google_cloud_default"
)
- **导出BigQuery数据到GCS**:
from airflow.providers.google.cloud.transfers.bigquery_to_gcs import BigQueryToGCSOperator
export_data = BigQueryToGCSOperator(
task_id="export_data",
source_project_dataset_table="project.dataset.source_table",
destination_cloud_storage_uris=["gs://my-bucket/export/data.csv"],
export_format="CSV",
gcp_conn_id="google_cloud_default"
)
3. 使用BigQuery Hook[编辑 | 编辑源代码]
如需更灵活的操作,可直接调用`BigQueryHook`:
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
def query_and_save():
hook = BigQueryHook(gcp_conn_id="google_cloud_default")
records = hook.get_records(sql="SELECT * FROM `project.dataset.table` LIMIT 10")
print(f"Fetched {len(records)} rows")
custom_task = PythonOperator(task_id="custom_query", python_callable=query_and_save)
实际案例[编辑 | 编辑源代码]
场景:每日销售报表自动化[编辑 | 编辑源代码]
1. **从GCS加载原始销售数据**到BigQuery临时表。 2. **执行聚合查询**生成日报表。 3. **导出报表**到GCS供下游系统使用。
对应DAG代码:
with DAG("daily_sales_report", schedule_interval="@daily") as dag:
load_data = GCSToBigQueryOperator(task_id="load_sales_data", ...)
run_query = BigQueryExecuteQueryOperator(
task_id="generate_report",
sql="""
SELECT
date,
SUM(revenue) as total_revenue
FROM `project.temp.sales`
GROUP BY date
"""
)
export_report = BigQueryToGCSOperator(task_id="export_report", ...)
load_data >> run_query >> export_report
高级配置[编辑 | 编辑源代码]
错误处理与重试[编辑 | 编辑源代码]
通过Airflow的`retries`和`retry_delay`参数实现自动重试:
run_query = BigQueryExecuteQueryOperator(
task_id="run_query",
sql="...",
retries=3,
retry_delay=timedelta(minutes=5),
...
)
动态SQL生成[编辑 | 编辑源代码]
使用Jinja模板传递参数:
run_query = BigQueryExecuteQueryOperator(
task_id="run_query",
sql="SELECT * FROM `{{ params.table }}` WHERE date = '{{ ds }}'",
params={"table": "project.dataset.sales"},
...
)
常见问题[编辑 | 编辑源代码]
- 权限不足:确保服务账号有`bigquery.tables.create`和`bigquery.jobs.create`权限。
- SQL语法错误:设置`use_legacy_sql=False`以使用标准SQL。
- 超时问题:调整`execution_timeout`参数(如`datetime.timedelta(minutes=30)`)。
总结[编辑 | 编辑源代码]
通过Airflow与BigQuery集成,用户能够高效编排数据管道,实现从数据加载、转换到分发的全流程自动化。结合Airflow的调度能力和BigQuery的弹性计算,可轻松应对TB级数据处理需求。