Airflow Connections类型
外观
Airflow Connections类型[编辑 | 编辑源代码]
Airflow Connections是Apache Airflow中用于存储和管理外部系统连接凭据的核心组件。它允许用户以安全、集中化的方式配置数据库、API、云服务等外部资源的访问信息,避免在DAG代码中硬编码敏感数据。本条目将详细介绍Connection的类型体系、配置方法及实际应用。
核心概念[编辑 | 编辑源代码]
什么是Connection?[编辑 | 编辑源代码]
在Airflow中,Connection是一个包含以下要素的元数据实体:
- Conn Type:连接类型(如MySQL、HTTP、S3等)
- Host:目标系统地址
- Login:认证用户名(可选)
- Password:认证密码(可选)
- Extra:JSON格式的附加参数
这些信息被加密后存储在Airflow元数据库中,通过唯一的conn_id进行引用。
为什么需要Connection?[编辑 | 编辑源代码]
传统方式直接在代码中写入连接信息会导致:
- 安全隐患(代码库泄露敏感信息)
- 维护困难(多环境切换需修改代码)
- 缺乏复用性
Airflow Connections通过解耦配置与逻辑解决了这些问题。
Connection类型全览[编辑 | 编辑源代码]
Airflow内置支持超过60种连接类型,主要分为以下类别:
数据库类[编辑 | 编辑源代码]
类型 | 描述 | 必要参数示例 |
---|---|---|
Postgres | PostgreSQL数据库 | host, port, schema |
MySQL | MySQL数据库 | host, port, database |
Oracle | Oracle数据库 | host, port, service_name |
MSSQL | Microsoft SQL Server | host, port, schema |
SQLite | SQLite数据库 | host (文件路径) |
云服务类[编辑 | 编辑源代码]
类型 | 描述 | 必要参数示例 |
---|---|---|
AWS | Amazon Web Services | aws_access_key_id, aws_secret_access_key |
GCP | Google Cloud Platform | keyfile_json, project_id |
Azure | Microsoft Azure | login (client_id), password (secret), extra (tenant) |
S3 | Amazon S3存储 | host (自定义端点), extra (region_name) |
消息队列类[编辑 | 编辑源代码]
类型 | 描述 | 必要参数示例 |
---|---|---|
RabbitMQ | AMQP协议消息队列 | host, port, login/password |
Kafka | Apache Kafka | host, extra (config) |
API类[编辑 | 编辑源代码]
类型 | 描述 | 必要参数示例 |
---|---|---|
HTTP | 通用HTTP服务 | host (基础URL) |
JIRA | Atlassian JIRA | host, port, extra (project) |
配置Connection[编辑 | 编辑源代码]
Web UI方式[编辑 | 编辑源代码]
通过Admin → Connections界面可视化配置:
CLI方式[编辑 | 编辑源代码]
使用airflow connections
命令:
# 添加Postgres连接
airflow connections add \
--conn-type postgres \
--conn-host localhost \
--conn-login airflow \
--conn-password secure123 \
--conn-port 5432 \
--conn-schema airflow_db \
pg_prod
环境变量方式[编辑 | 编辑源代码]
命名规则:AIRFLOW_CONN_{CONN_ID}
export AIRFLOW_CONN_MY_DB="postgres://user:pass@localhost:5432/db?param=value"
代码示例[编辑 | 编辑源代码]
在Operator中使用[编辑 | 编辑源代码]
from airflow.providers.postgres.operators.postgres import PostgresOperator
query_task = PostgresOperator(
task_id="run_query",
postgres_conn_id="pg_prod", # 引用Connection
sql="SELECT * FROM important_table;"
)
直接获取Connection对象[编辑 | 编辑源代码]
from airflow.hooks.base import BaseHook
def use_connection():
conn = BaseHook.get_connection("my_conn")
print(f"Host: {conn.host}, Port: {conn.port}")
# 实际连接需使用对应Hook(如PostgresHook)
高级配置[编辑 | 编辑源代码]
Extra字段详解[编辑 | 编辑源代码]
以Snowflake连接为例:
{
"account": "xy12345",
"warehouse": "COMPUTE_WH",
"database": "ANALYTICS",
"region": "us-west-2",
"role": "LOADER"
}
动态Connection生成[编辑 | 编辑源代码]
通过环境变量动态创建:
from airflow.models import Connection
from airflow import settings
def create_temp_conn():
conn = Connection(
conn_id="temp_api",
conn_type="http",
host="https://api.example.com/v2",
extra={"timeout": 60}
)
session = settings.Session()
session.add(conn)
session.commit()
最佳实践[编辑 | 编辑源代码]
- 命名规范:使用
service_env
格式(如redshift_dev
) - 权限控制:限制敏感Connection的访问权限
- 轮换策略:定期更新密码/密钥
- 测试隔离:为测试环境创建独立Connection
故障排查[编辑 | 编辑源代码]
常见问题及解决方案:
- 连接失败:检查网络可达性、防火墙规则
- 认证错误:验证凭据是否过期
- 参数错误:确认Extra字段格式符合目标系统要求
使用测试命令验证:
airflow connections test pg_prod
实际案例[编辑 | 编辑源代码]
场景:跨云数据管道[编辑 | 编辑源代码]
配置示例:
transfer_task = GCSToRedshiftOperator(
task_id="gcs_to_redshift",
gcp_conn_id="gcp_analytics",
redshift_conn_id="redshift_dw",
...
)
通过合理使用Connection,可以构建复杂的数据管道而无需暴露任何凭据信息。