跳转到内容

Airflow Connections概念

来自代码酷

Airflow Connections概念[编辑 | 编辑源代码]

介绍[编辑 | 编辑源代码]

Airflow Connections是Apache Airflow中用于管理与外部系统通信的认证凭据和连接参数的机制。它允许用户将敏感信息(如数据库密码、API密钥)集中存储,并在DAG中通过逻辑标识符引用,避免硬编码,提高安全性和可维护性。

连接信息存储在Airflow的元数据库(如PostgreSQL/MySQL)中,可通过Web UI、CLI或代码管理。每个连接包含以下核心属性:

  • Conn Id:唯一标识符(如`postgres_default`)
  • Conn Type:连接类型(如`postgres`、`http`、`s3`)
  • Host:目标系统地址
  • Login:用户名
  • Password:认证密码
  • Port:服务端口
  • Extra:JSON格式的额外参数

连接类型[编辑 | 编辑源代码]

Airflow内置支持多种连接类型(部分列表):

类型 用途 示例Conn Id
postgres PostgreSQL数据库 postgres_warehouse
mysql MySQL数据库 mysql_analytics
http HTTP API端点 api_weather
s3 AWS S3存储桶 s3_data_lake

代码示例[编辑 | 编辑源代码]

通过Hook使用连接[编辑 | 编辑源代码]

以下示例展示如何在DAG中使用PostgresHook通过连接访问数据库:

from airflow import DAG
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.operators.python import PythonOperator
from datetime import datetime

def query_data():
    # 使用conn_id引用连接
    hook = PostgresHook(postgres_conn_id="postgres_analytics")
    records = hook.get_records("SELECT * FROM sales WHERE date > '2023-01-01'")
    for row in records:
        print(row)

with DAG('connection_demo', start_date=datetime(2023, 1, 1)) as dag:
    run_query = PythonOperator(
        task_id='run_query',
        python_callable=query_data
    )

输出示例[编辑 | 编辑源代码]

假设查询返回3条记录,控制台输出类似:

(1, '2023-01-05', 150.0)
(2, '2023-01-12', 89.99)
(3, '2023-01-18', 42.50)

管理连接[编辑 | 编辑源代码]

通过Web UI[编辑 | 编辑源代码]

1. 导航到 AdminConnections 2. 点击 + 添加新连接 3. 填写连接参数并保存

通过CLI[编辑 | 编辑源代码]

创建连接的命令行示例:

airflow connections add 'postgres_analytics' \
    --conn-type 'postgres' \
    --conn-host '192.168.1.100' \
    --conn-login 'airflow_user' \
    --conn-password 'secure123' \
    --conn-port 5432 \
    --conn-extra '{"sslmode": "require"}'

实际案例[编辑 | 编辑源代码]

场景:跨系统数据管道[编辑 | 编辑源代码]

需要从API获取天气数据并存入数据库的DAG:

graph LR A[HTTP Connection: api_weather] -->|提取数据| B(PythonOperator) B -->|转换数据| C[Postgres Connection: postgres_dwh]

实现代码片段:

def etl_weather():
    # 获取API数据
    http_hook = HttpHook(method='GET', http_conn_id='api_weather')
    response = http_hook.run('/v1/forecast?city=Berlin')
    
    # 处理数据
    data = response.json()['hourly']
    processed = [transform(item) for item in data]
    
    # 存入数据库
    pg_hook = PostgresHook('postgres_dwh')
    pg_hook.insert_rows('weather_data', processed)

高级配置[编辑 | 编辑源代码]

动态连接生成[编辑 | 编辑源代码]

可通过环境变量动态创建连接(适合K8s部署):

from airflow.models import Connection
from airflow import settings

def create_conn():
    conn = Connection(
        conn_id='dynamic_s3',
        conn_type='s3',
        extra={
            'aws_access_key_id': os.getenv('AWS_ACCESS_KEY'),
            'aws_secret_access_key': os.getenv('AWS_SECRET_KEY')
        }
    )
    session = settings.Session()
    session.add(conn)
    session.commit()

连接测试[编辑 | 编辑源代码]

使用`test_connection`命令验证配置:

airflow connections test postgres_analytics

成功时输出:

Testing connection `postgres_analytics`...
Success!

最佳实践[编辑 | 编辑源代码]

  • 使用命名约定(如`<service>_<purpose>`)
  • 通过Vault等工具管理敏感信息
  • 为不同环境(Dev/Prod)配置不同连接
  • 定期轮换凭据
  • 在`extra`字段中存储非标准参数

数学表示[编辑 | 编辑源代码]

连接可用性检查可建模为: 解析失败 (语法错误): {\displaystyle P(\text{conn\_works}) = 1 - \prod_{i=1}^{n}(1 - P(\text{component}_i)) } 其中componenti表示网络、认证、服务等独立组件。