Airflow Connections概念
外观
Airflow Connections概念[编辑 | 编辑源代码]
介绍[编辑 | 编辑源代码]
Airflow Connections是Apache Airflow中用于管理与外部系统通信的认证凭据和连接参数的机制。它允许用户将敏感信息(如数据库密码、API密钥)集中存储,并在DAG中通过逻辑标识符引用,避免硬编码,提高安全性和可维护性。
连接信息存储在Airflow的元数据库(如PostgreSQL/MySQL)中,可通过Web UI、CLI或代码管理。每个连接包含以下核心属性:
- Conn Id:唯一标识符(如`postgres_default`)
- Conn Type:连接类型(如`postgres`、`http`、`s3`)
- Host:目标系统地址
- Login:用户名
- Password:认证密码
- Port:服务端口
- Extra:JSON格式的额外参数
连接类型[编辑 | 编辑源代码]
Airflow内置支持多种连接类型(部分列表):
类型 | 用途 | 示例Conn Id |
---|---|---|
postgres | PostgreSQL数据库 | postgres_warehouse |
mysql | MySQL数据库 | mysql_analytics |
http | HTTP API端点 | api_weather |
s3 | AWS S3存储桶 | s3_data_lake |
代码示例[编辑 | 编辑源代码]
通过Hook使用连接[编辑 | 编辑源代码]
以下示例展示如何在DAG中使用PostgresHook通过连接访问数据库:
from airflow import DAG
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.operators.python import PythonOperator
from datetime import datetime
def query_data():
# 使用conn_id引用连接
hook = PostgresHook(postgres_conn_id="postgres_analytics")
records = hook.get_records("SELECT * FROM sales WHERE date > '2023-01-01'")
for row in records:
print(row)
with DAG('connection_demo', start_date=datetime(2023, 1, 1)) as dag:
run_query = PythonOperator(
task_id='run_query',
python_callable=query_data
)
输出示例[编辑 | 编辑源代码]
假设查询返回3条记录,控制台输出类似:
(1, '2023-01-05', 150.0) (2, '2023-01-12', 89.99) (3, '2023-01-18', 42.50)
管理连接[编辑 | 编辑源代码]
通过Web UI[编辑 | 编辑源代码]
1. 导航到 Admin → Connections 2. 点击 + 添加新连接 3. 填写连接参数并保存
通过CLI[编辑 | 编辑源代码]
创建连接的命令行示例:
airflow connections add 'postgres_analytics' \
--conn-type 'postgres' \
--conn-host '192.168.1.100' \
--conn-login 'airflow_user' \
--conn-password 'secure123' \
--conn-port 5432 \
--conn-extra '{"sslmode": "require"}'
实际案例[编辑 | 编辑源代码]
场景:跨系统数据管道[编辑 | 编辑源代码]
需要从API获取天气数据并存入数据库的DAG:
实现代码片段:
def etl_weather():
# 获取API数据
http_hook = HttpHook(method='GET', http_conn_id='api_weather')
response = http_hook.run('/v1/forecast?city=Berlin')
# 处理数据
data = response.json()['hourly']
processed = [transform(item) for item in data]
# 存入数据库
pg_hook = PostgresHook('postgres_dwh')
pg_hook.insert_rows('weather_data', processed)
高级配置[编辑 | 编辑源代码]
动态连接生成[编辑 | 编辑源代码]
可通过环境变量动态创建连接(适合K8s部署):
from airflow.models import Connection
from airflow import settings
def create_conn():
conn = Connection(
conn_id='dynamic_s3',
conn_type='s3',
extra={
'aws_access_key_id': os.getenv('AWS_ACCESS_KEY'),
'aws_secret_access_key': os.getenv('AWS_SECRET_KEY')
}
)
session = settings.Session()
session.add(conn)
session.commit()
连接测试[编辑 | 编辑源代码]
使用`test_connection`命令验证配置:
airflow connections test postgres_analytics
成功时输出:
Testing connection `postgres_analytics`... Success!
最佳实践[编辑 | 编辑源代码]
- 使用命名约定(如`<service>_<purpose>`)
- 通过Vault等工具管理敏感信息
- 为不同环境(Dev/Prod)配置不同连接
- 定期轮换凭据
- 在`extra`字段中存储非标准参数
数学表示[编辑 | 编辑源代码]
连接可用性检查可建模为: 解析失败 (语法错误): {\displaystyle P(\text{conn\_works}) = 1 - \prod_{i=1}^{n}(1 - P(\text{component}_i)) } 其中表示网络、认证、服务等独立组件。