跳转到内容

Airflow Variables基础

来自代码酷

Airflow Variables基础[编辑 | 编辑源代码]

介绍[编辑 | 编辑源代码]

Airflow Variables是Apache Airflow中的一种机制,用于存储和访问全局配置参数或动态值。它们允许用户将配置与代码分离,便于在不同环境(如开发、测试、生产)中复用任务逻辑。Variables可以存储字符串、JSON、字典等数据类型,并通过Airflow UI或代码进行管理。

Variables的核心特点:

  • 集中管理:通过Airflow元数据库存储,可在DAG间共享
  • 动态访问:运行时从数据库读取最新值
  • 安全存储:敏感信息可通过加密处理
  • 版本无关:修改变量值无需重新部署DAG

变量类型[编辑 | 编辑源代码]

Airflow支持两种变量存储方式:

类型 描述 适用场景
键值存储 简单的键值对 独立参数配置
JSON存储 结构化JSON数据 复杂配置或批量参数

基本操作[编辑 | 编辑源代码]

创建变量[编辑 | 编辑源代码]

可通过UI或代码创建变量:

方法1:Airflow UI 1. 导航到 Admin > Variables 2. 点击 "Create" 3. 输入Key和Value(支持JSON)

方法2:Python代码

from airflow.models import Variable

# 设置简单变量
Variable.set("api_endpoint", "https://example.com/api")

# 设置JSON变量
config = {"timeout": 300, "retries": 3}
Variable.set("job_config", config, serialize_json=True)

访问变量[编辑 | 编辑源代码]

from airflow.models import Variable

# 获取简单变量
endpoint = Variable.get("api_endpoint")  # 返回字符串

# 获取JSON变量(自动反序列化)
config = Variable.get("job_config", deserialize_json=True)  # 返回字典

# 带默认值访问
retries = Variable.get("max_retries", default_var=5)  # 键不存在时返回5

高级用法[编辑 | 编辑源代码]

变量继承[编辑 | 编辑源代码]

Airflow支持通过Variable.get()default_var参数实现变量继承:

graph TD A[尝试获取生产环境变量] -->|存在| B[使用生产配置] A -->|不存在| C[使用开发默认值]

示例:

env = Variable.get("environment", "dev")  # 默认为开发环境
db_url = Variable.get(f"{env}_database_url")

变量与宏结合[编辑 | 编辑源代码]

Variables可以在Jinja模板中通过宏访问:

task = BashOperator(
    task_id="process_data",
    bash_command="curl {{ var.value.api_endpoint }}",
    dag=dag
)

性能考虑[编辑 | 编辑源代码]

频繁访问变量可能影响性能,因为:

  • 每次Variable.get()都会查询数据库
  • 解决方案:
    • 在DAG文件中缓存变量值
    • 使用Variable.get()deserialize_json=True批量获取参数

最佳实践示例:

# 不推荐 - 多次查询
timeout = Variable.get("timeout")
retries = Variable.get("retries")

# 推荐 - 单次查询
config = Variable.get("task_params", deserialize_json=True)
timeout = config["timeout"]
retries = config["retries"]

安全注意事项[编辑 | 编辑源代码]

1. 敏感数据:不应在变量中存储明文密码,应使用Airflow的Connections或外部密钥管理 2. 访问控制:通过Airflow RBAC限制变量访问权限 3. 版本控制:变量变更应记录在配置管理系统

实际案例[编辑 | 编辑源代码]

场景:多环境配置管理[编辑 | 编辑源代码]

graph LR DAG -->|读取| DEV_VAR[dev_config] DAG -->|读取| PROD_VAR[prod_config] DEV_VAR -.->|环境检测| DAG PROD_VAR -.->|环境检测| DAG

实现代码:

from airflow.models import Variable

env = Variable.get("deployment_env", "dev")

# 根据环境加载不同配置
if env == "prod":
    config = Variable.get("prod_database", deserialize_json=True)
else:
    config = Variable.get("dev_database", deserialize_json=True)

# 使用配置
db_url = config["url"]
pool_size = config["pool_size"]

场景:动态任务参数[编辑 | 编辑源代码]

thresholds = Variable.get("alert_thresholds", deserialize_json=True)

email_alert = EmailOperator(
    task_id="alert",
    to="{{ var.value.alert_recipients }}",
    subject=f"值超过阈值 {thresholds['critical']}",
    ...
)

数学表示[编辑 | 编辑源代码]

变量访问可以表示为函数: V(k,d)={v如果变量 k 存在且值为 vd如果变量 k 不存在

其中:

  • k 为变量键
  • d 为默认值
  • v 为变量值

总结[编辑 | 编辑源代码]

Airflow Variables提供了灵活的配置管理方式,特别适合:

  • 环境特定的参数配置
  • 需要频繁调整的运行时参数
  • 跨DAG共享的公共配置

最佳实践组合:

  1. 将静态配置存储在Variables中
  2. 敏感信息使用Connections管理
  3. 对复杂配置使用JSON序列化
  4. 为生产环境设置适当的访问控制