Airflow Variables基础
外观
Airflow Variables基础[编辑 | 编辑源代码]
介绍[编辑 | 编辑源代码]
Airflow Variables是Apache Airflow中的一种机制,用于存储和访问全局配置参数或动态值。它们允许用户将配置与代码分离,便于在不同环境(如开发、测试、生产)中复用任务逻辑。Variables可以存储字符串、JSON、字典等数据类型,并通过Airflow UI或代码进行管理。
Variables的核心特点:
- 集中管理:通过Airflow元数据库存储,可在DAG间共享
- 动态访问:运行时从数据库读取最新值
- 安全存储:敏感信息可通过加密处理
- 版本无关:修改变量值无需重新部署DAG
变量类型[编辑 | 编辑源代码]
Airflow支持两种变量存储方式:
类型 | 描述 | 适用场景 |
---|---|---|
键值存储 | 简单的键值对 | 独立参数配置 |
JSON存储 | 结构化JSON数据 | 复杂配置或批量参数 |
基本操作[编辑 | 编辑源代码]
创建变量[编辑 | 编辑源代码]
可通过UI或代码创建变量:
方法1:Airflow UI 1. 导航到 Admin > Variables 2. 点击 "Create" 3. 输入Key和Value(支持JSON)
方法2:Python代码
from airflow.models import Variable
# 设置简单变量
Variable.set("api_endpoint", "https://example.com/api")
# 设置JSON变量
config = {"timeout": 300, "retries": 3}
Variable.set("job_config", config, serialize_json=True)
访问变量[编辑 | 编辑源代码]
from airflow.models import Variable
# 获取简单变量
endpoint = Variable.get("api_endpoint") # 返回字符串
# 获取JSON变量(自动反序列化)
config = Variable.get("job_config", deserialize_json=True) # 返回字典
# 带默认值访问
retries = Variable.get("max_retries", default_var=5) # 键不存在时返回5
高级用法[编辑 | 编辑源代码]
变量继承[编辑 | 编辑源代码]
Airflow支持通过Variable.get()
的default_var
参数实现变量继承:
示例:
env = Variable.get("environment", "dev") # 默认为开发环境
db_url = Variable.get(f"{env}_database_url")
变量与宏结合[编辑 | 编辑源代码]
Variables可以在Jinja模板中通过宏访问:
task = BashOperator(
task_id="process_data",
bash_command="curl {{ var.value.api_endpoint }}",
dag=dag
)
性能考虑[编辑 | 编辑源代码]
频繁访问变量可能影响性能,因为:
- 每次
Variable.get()
都会查询数据库 - 解决方案:
- 在DAG文件中缓存变量值
- 使用
Variable.get()
的deserialize_json=True
批量获取参数
最佳实践示例:
# 不推荐 - 多次查询
timeout = Variable.get("timeout")
retries = Variable.get("retries")
# 推荐 - 单次查询
config = Variable.get("task_params", deserialize_json=True)
timeout = config["timeout"]
retries = config["retries"]
安全注意事项[编辑 | 编辑源代码]
1. 敏感数据:不应在变量中存储明文密码,应使用Airflow的Connections或外部密钥管理 2. 访问控制:通过Airflow RBAC限制变量访问权限 3. 版本控制:变量变更应记录在配置管理系统
实际案例[编辑 | 编辑源代码]
场景:多环境配置管理[编辑 | 编辑源代码]
实现代码:
from airflow.models import Variable
env = Variable.get("deployment_env", "dev")
# 根据环境加载不同配置
if env == "prod":
config = Variable.get("prod_database", deserialize_json=True)
else:
config = Variable.get("dev_database", deserialize_json=True)
# 使用配置
db_url = config["url"]
pool_size = config["pool_size"]
场景:动态任务参数[编辑 | 编辑源代码]
thresholds = Variable.get("alert_thresholds", deserialize_json=True)
email_alert = EmailOperator(
task_id="alert",
to="{{ var.value.alert_recipients }}",
subject=f"值超过阈值 {thresholds['critical']}",
...
)
数学表示[编辑 | 编辑源代码]
变量访问可以表示为函数:
其中:
- 为变量键
- 为默认值
- 为变量值
总结[编辑 | 编辑源代码]
Airflow Variables提供了灵活的配置管理方式,特别适合:
- 环境特定的参数配置
- 需要频繁调整的运行时参数
- 跨DAG共享的公共配置
最佳实践组合:
- 将静态配置存储在Variables中
- 敏感信息使用Connections管理
- 对复杂配置使用JSON序列化
- 为生产环境设置适当的访问控制