跳转到内容

Airflow与Azure Blob交互

来自代码酷
Admin留言 | 贡献2025年4月29日 (二) 18:51的版本 (Page creation by admin bot)

(差异) ←上一版本 | 已核准修订 (差异) | 最后版本 (差异) | 下一版本→ (差异)

Airflow与Azure Blob交互[编辑 | 编辑源代码]

概述[编辑 | 编辑源代码]

Apache Airflow是一个开源的工作流自动化工具,用于编排复杂的计算任务。Azure Blob Storage是微软Azure提供的对象存储服务,用于存储非结构化数据。通过将Airflow与Azure Blob集成,用户可以自动化数据管道,例如从Blob存储中提取数据、转换数据并加载到其他系统中。

本指南将详细介绍如何在Airflow中配置Azure Blob连接,并通过实际操作示例展示常见用例。

先决条件[编辑 | 编辑源代码]

在开始之前,请确保:

  • 已安装Apache Airflow(建议版本2.0+)
  • 拥有Azure订阅并创建了Blob存储账户
  • 已安装Azure Storage Blob Python SDK(可通过`pip install azure-storage-blob`安装)

配置Azure Blob连接[编辑 | 编辑源代码]

1. 获取Azure Blob凭据[编辑 | 编辑源代码]

在Azure门户中,导航到存储账户,获取以下信息:

  • **存储账户名称**(Storage Account Name)
  • **访问密钥**(Access Key)或使用SAS令牌
  • **容器名称**(Container Name)

2. 在Airflow中创建连接[编辑 | 编辑源代码]

通过Airflow UI或环境变量配置Azure Blob连接:

# 使用Airflow UI配置:
1. 进入Admin -> Connections
2. 点击"Add a new record"
3. 填写以下字段
   - Conn Id: `azure_blob_default`
   - Conn Type: `Azure Blob Storage`
   - Extra: 
     {
       "account_name": "<your-account-name>",
       "account_key": "<your-account-key>"
     }

或通过环境变量:

export AIRFLOW_CONN_AZURE_BLOB_DEFAULT='azure_blob_storage://<account-name>:<account-key>@'

基本操作示例[编辑 | 编辑源代码]

上传文件到Azure Blob[编辑 | 编辑源代码]

使用`AzureBlobStorageHook`上传文件:

from airflow.providers.microsoft.azure.hooks.wasb import WasbHook

def upload_to_blob():
    hook = WasbHook(wasb_conn_id='azure_blob_default')
    hook.load_file(
        file_path='/local/path/to/file.txt',
        container_name='mycontainer',
        blob_name='remote_file.txt',
        overwrite=True
    )

从Azure Blob下载文件[编辑 | 编辑源代码]

def download_from_blob():
    hook = WasbHook(wasb_conn_id='azure_blob_default')
    hook.get_file(
        file_path='/local/path/to/download.txt',
        container_name='mycontainer',
        blob_name='remote_file.txt'
    )

高级用例[编辑 | 编辑源代码]

使用AzureDataLakeStorageOperator[编辑 | 编辑源代码]

对于需要处理大数据的场景,可以使用专用Operator:

from airflow.providers.microsoft.azure.transfers.local_to_wasb import LocalFilesystemToWasbOperator

upload_task = LocalFilesystemToWasbOperator(
    task_id='upload_to_blob',
    wasb_conn_id='azure_blob_default',
    file_path='/local/path/file.csv',
    container_name='data-lake',
    blob_name='processed/file_{ ds_nodash }.csv',
    dag=dag
)

监控Blob存储[编辑 | 编辑源代码]

创建传感器来监控新文件到达:

from airflow.providers.microsoft.azure.sensors.wasb import WasbBlobSensor

wait_for_file = WasbBlobSensor(
    task_id='wait_for_processing_file',
    wasb_conn_id='azure_blob_default',
    container_name='inputs',
    blob_name='data_to_process.csv',
    dag=dag
)

最佳实践[编辑 | 编辑源代码]

1. 使用环境变量存储敏感凭据,而非硬编码 2. 实现重试机制处理暂时性网络故障 3. 考虑使用SAS令牌而非账户密钥,提高安全性 4. 监控存储成本,定期清理不需要的文件

故障排除[编辑 | 编辑源代码]

错误 可能原因 解决方案
`403 Forbidden` 权限不足 检查账户密钥/SAS令牌是否有效
`404 Not Found` Blob或容器不存在 验证容器和Blob名称拼写
`TimeoutError` 网络问题 增加超时时间或检查网络连接

性能考虑[编辑 | 编辑源代码]

  • 对于大文件,考虑使用分块上传(Azure SDK自动处理)
  • 批量操作时,使用`BlobServiceClient`保持连接复用
  • 在Azure区域内部署Airflow worker以减少延迟

实际应用案例[编辑 | 编辑源代码]

电商数据管道示例

AzureBlobStorageHook
WasbBlobSensor
AzureDataLakeStorageOperator
WasbHook
用户行为日志CSV
Azure Blob
Airflow DAG
检测新文件
处理数据
结果写回Blob

实现代码片段

with DAG('ecommerce_pipeline', schedule_interval='@daily') as dag:
    detect_file = WasbBlobSensor(
        task_id='detect_new_logs',
        container_name='raw-logs',
        blob_name='user_activity_{{ ds }}.csv'
    )
    
    process_data = PythonOperator(
        task_id='transform_data',
        python_callable=process_user_activity
    )
    
    upload_results = LocalFilesystemToWasbOperator(
        task_id='store_results',
        file_path='/tmp/processed_{{ ds }}.parquet',
        container_name='processed-data',
        blob_name='user_metrics/{{ ds }}/data.parquet'
    )
    
    detect_file >> process_data >> upload_results

数学建模(可选)[编辑 | 编辑源代码]

对于需要计算存储成本的情况,可以使用:

Monthly Cost=i=1n(Blob Sizei×Storage Tier Rate10243×30)+Transactions×Operation Cost

其中:

  • Blob Sizei 是第i个Blob的大小(字节)
  • Storage Tier Rate 是每GB每月的存储费率
  • Transactions 是操作次数

总结[编辑 | 编辑源代码]

通过Airflow与Azure Blob的集成,数据工程师可以构建可靠的数据管道,实现自动化的工作流。本文涵盖了从基础配置到高级用例的全面内容,帮助用户快速上手并应用于实际生产环境。