Airflow Variables管理
外观
Airflow Variables管理[编辑 | 编辑源代码]
Airflow Variables是Apache Airflow中用于存储和访问全局配置参数的键值对系统。它们允许用户动态管理工作流中的配置值,而无需硬编码到DAG文件中,从而提高代码的可维护性和灵活性。
基本概念[编辑 | 编辑源代码]
Variables在Airflow中扮演以下角色:
- 集中存储环境特定参数(如文件路径、API密钥)
- 在不同DAG之间共享配置
- 通过UI或CLI动态更新值而不修改代码
变量类型[编辑 | 编辑源代码]
Airflow支持存储多种数据类型:
- 字符串(默认)
- JSON(可存储复杂结构)
- 数值
- 布尔值
变量操作[编辑 | 编辑源代码]
创建变量[编辑 | 编辑源代码]
可通过三种方式创建变量:
1. Web界面 在Airflow UI的Admin → Variables中手动添加
2. CLI
airflow variables set 'my_var' 'example_value'
3. Python API
from airflow.models import Variable
Variable.set("database_url", "postgresql://user:pass@localhost:5432/db")
访问变量[编辑 | 编辑源代码]
在DAG中使用:
from airflow.models import Variable
# 直接访问
db_url = Variable.get("database_url")
# 带默认值
timeout = Variable.get("timeout", default_var=300)
# 反序列化JSON
config = Variable.get("config", deserialize_json=True)
高级用法[编辑 | 编辑源代码]
变量继承[编辑 | 编辑源代码]
Airflow支持变量层级结构:
加密变量[编辑 | 编辑源代码]
敏感信息应使用加密存储:
from airflow.models import Variable
from cryptography.fernet import Fernet
key = Fernet.generate_key()
cipher_suite = Fernet(key)
encrypted = cipher_suite.encrypt(b"secret_value")
Variable.set("api_key", encrypted.decode())
最佳实践[编辑 | 编辑源代码]
1. 命名规范:使用下划线分隔的命名方式(如`max_retry_count`)
2. 敏感数据:结合Airflow的Connections功能管理凭证
3. 性能考虑:频繁访问的变量应缓存到本地
4. 版本控制:通过airflow variables export/import
实现配置迁移
实际案例[编辑 | 编辑源代码]
场景:跨环境部署ETL管道
1. 定义环境特定变量:
{
"dev": {
"input_path": "/data/dev/input",
"output_path": "/data/dev/output"
},
"prod": {
"input_path": "/data/prod/input",
"output_path": "/data/prod/output"
}
}
2. DAG中使用:
env = Variable.get("environment") # 'dev' 或 'prod'
config = Variable.get(f"{env}_config", deserialize_json=True)
def process_data(**kwargs):
input_path = config["input_path"]
output_path = config["output_path"]
# 处理逻辑...
数学表示[编辑 | 编辑源代码]
变量存储可形式化为键值映射:
常见问题[编辑 | 编辑源代码]
Q:变量与XCom有何区别?
- 变量:全局配置,长期存储
- XCom:任务间临时通信,短期存在
Q:如何批量管理变量? 使用JSON文件导入/导出:
airflow variables import variables.json
airflow variables export variables.json
性能注意事项[编辑 | 编辑源代码]
频繁访问变量会导致数据库查询增加,建议:
- 在任务开始时获取并缓存变量
- 对静态配置使用DAG顶层的变量获取
- 考虑使用
Variable.get()
的deserialize_json=True
参数减少调用次数