跳转到内容

Airflow Connections类型

来自代码酷

Airflow Connections类型[编辑 | 编辑源代码]

Airflow Connections是Apache Airflow中用于存储和管理外部系统连接凭据的核心组件。它允许用户以安全、集中化的方式配置数据库、API、云服务等外部资源的访问信息,避免在DAG代码中硬编码敏感数据。本条目将详细介绍Connection的类型体系、配置方法及实际应用。

核心概念[编辑 | 编辑源代码]

什么是Connection?[编辑 | 编辑源代码]

在Airflow中,Connection是一个包含以下要素的元数据实体:

  • Conn Type:连接类型(如MySQL、HTTP、S3等)
  • Host:目标系统地址
  • Login:认证用户名(可选)
  • Password:认证密码(可选)
  • Extra:JSON格式的附加参数

这些信息被加密后存储在Airflow元数据库中,通过唯一的conn_id进行引用。

为什么需要Connection?[编辑 | 编辑源代码]

传统方式直接在代码中写入连接信息会导致:

  1. 安全隐患(代码库泄露敏感信息)
  2. 维护困难(多环境切换需修改代码)
  3. 缺乏复用性

Airflow Connections通过解耦配置与逻辑解决了这些问题。

Connection类型全览[编辑 | 编辑源代码]

Airflow内置支持超过60种连接类型,主要分为以下类别:

数据库类[编辑 | 编辑源代码]

类型 描述 必要参数示例
Postgres PostgreSQL数据库 host, port, schema
MySQL MySQL数据库 host, port, database
Oracle Oracle数据库 host, port, service_name
MSSQL Microsoft SQL Server host, port, schema
SQLite SQLite数据库 host (文件路径)

云服务类[编辑 | 编辑源代码]

类型 描述 必要参数示例
AWS Amazon Web Services aws_access_key_id, aws_secret_access_key
GCP Google Cloud Platform keyfile_json, project_id
Azure Microsoft Azure login (client_id), password (secret), extra (tenant)
S3 Amazon S3存储 host (自定义端点), extra (region_name)

消息队列类[编辑 | 编辑源代码]

类型 描述 必要参数示例
RabbitMQ AMQP协议消息队列 host, port, login/password
Kafka Apache Kafka host, extra (config)

API类[编辑 | 编辑源代码]

类型 描述 必要参数示例
HTTP 通用HTTP服务 host (基础URL)
JIRA Atlassian JIRA host, port, extra (project)

配置Connection[编辑 | 编辑源代码]

Web UI方式[编辑 | 编辑源代码]

通过Admin → Connections界面可视化配置:

graph TD A[点击+按钮] --> B[选择Conn Type] B --> C[填写host/login等] C --> D[保存conn_id]

CLI方式[编辑 | 编辑源代码]

使用airflow connections命令:

# 添加Postgres连接
airflow connections add \
    --conn-type postgres \
    --conn-host localhost \
    --conn-login airflow \
    --conn-password secure123 \
    --conn-port 5432 \
    --conn-schema airflow_db \
    pg_prod

环境变量方式[编辑 | 编辑源代码]

命名规则:AIRFLOW_CONN_{CONN_ID}

export AIRFLOW_CONN_MY_DB="postgres://user:pass@localhost:5432/db?param=value"

代码示例[编辑 | 编辑源代码]

在Operator中使用[编辑 | 编辑源代码]

from airflow.providers.postgres.operators.postgres import PostgresOperator

query_task = PostgresOperator(
    task_id="run_query",
    postgres_conn_id="pg_prod",  # 引用Connection
    sql="SELECT * FROM important_table;"
)

直接获取Connection对象[编辑 | 编辑源代码]

from airflow.hooks.base import BaseHook

def use_connection():
    conn = BaseHook.get_connection("my_conn")
    print(f"Host: {conn.host}, Port: {conn.port}")
    # 实际连接需使用对应Hook(如PostgresHook)

高级配置[编辑 | 编辑源代码]

Extra字段详解[编辑 | 编辑源代码]

以Snowflake连接为例:

{
  "account": "xy12345",
  "warehouse": "COMPUTE_WH",
  "database": "ANALYTICS",
  "region": "us-west-2",
  "role": "LOADER"
}

动态Connection生成[编辑 | 编辑源代码]

通过环境变量动态创建:

from airflow.models import Connection
from airflow import settings

def create_temp_conn():
    conn = Connection(
        conn_id="temp_api",
        conn_type="http",
        host="https://api.example.com/v2",
        extra={"timeout": 60}
    )
    session = settings.Session()
    session.add(conn)
    session.commit()

最佳实践[编辑 | 编辑源代码]

  1. 命名规范:使用service_env格式(如redshift_dev
  2. 权限控制:限制敏感Connection的访问权限
  3. 轮换策略:定期更新密码/密钥
  4. 测试隔离:为测试环境创建独立Connection

故障排查[编辑 | 编辑源代码]

常见问题及解决方案:

  • 连接失败:检查网络可达性、防火墙规则
  • 认证错误:验证凭据是否过期
  • 参数错误:确认Extra字段格式符合目标系统要求

使用测试命令验证:

airflow connections test pg_prod

实际案例[编辑 | 编辑源代码]

场景:跨云数据管道[编辑 | 编辑源代码]

graph LR A[GCP BigQuery] -- GCP Connection --> B((Airflow)) B -- AWS Connection --> C[Redshift] B -- SFTP Connection --> D[Legacy System]

配置示例:

transfer_task = GCSToRedshiftOperator(
    task_id="gcs_to_redshift",
    gcp_conn_id="gcp_analytics",
    redshift_conn_id="redshift_dw",
    ...
)

通过合理使用Connection,可以构建复杂的数据管道而无需暴露任何凭据信息。