跳转到内容
主菜单
主菜单
移至侧栏
隐藏
导航
首页
最近更改
随机页面
MediaWiki帮助
代码酷
搜索
搜索
中文(中国大陆)
外观
创建账号
登录
个人工具
创建账号
登录
未登录编辑者的页面
了解详情
贡献
讨论
编辑“︁
Airflow Variables基础
”︁(章节)
页面
讨论
大陆简体
阅读
编辑
编辑源代码
查看历史
工具
工具
移至侧栏
隐藏
操作
阅读
编辑
编辑源代码
查看历史
常规
链入页面
相关更改
特殊页面
页面信息
外观
移至侧栏
隐藏
您的更改会在有权核准的用户核准后向读者展示。
警告:
您没有登录。如果您进行任何编辑,您的IP地址会公开展示。如果您
登录
或
创建账号
,您的编辑会以您的用户名署名,此外还有其他益处。
反垃圾检查。
不要
加入这个!
= Airflow Variables基础 = == 介绍 == '''Airflow Variables'''是Apache Airflow中的一种机制,用于存储和访问全局配置参数或动态值。它们允许用户将配置与代码分离,便于在不同环境(如开发、测试、生产)中复用任务逻辑。Variables可以存储字符串、JSON、字典等数据类型,并通过Airflow UI或代码进行管理。 Variables的核心特点: * '''集中管理''':通过Airflow元数据库存储,可在DAG间共享 * '''动态访问''':运行时从数据库读取最新值 * '''安全存储''':敏感信息可通过加密处理 * '''版本无关''':修改变量值无需重新部署DAG == 变量类型 == Airflow支持两种变量存储方式: {| class="wikitable" |- ! 类型 !! 描述 !! 适用场景 |- | '''键值存储''' || 简单的键值对 || 独立参数配置 |- | '''JSON存储''' || 结构化JSON数据 || 复杂配置或批量参数 |} == 基本操作 == === 创建变量 === 可通过UI或代码创建变量: '''方法1:Airflow UI''' 1. 导航到 ''Admin > Variables'' 2. 点击 "Create" 3. 输入Key和Value(支持JSON) '''方法2:Python代码''' <syntaxhighlight lang="python"> from airflow.models import Variable # 设置简单变量 Variable.set("api_endpoint", "https://example.com/api") # 设置JSON变量 config = {"timeout": 300, "retries": 3} Variable.set("job_config", config, serialize_json=True) </syntaxhighlight> === 访问变量 === <syntaxhighlight lang="python"> from airflow.models import Variable # 获取简单变量 endpoint = Variable.get("api_endpoint") # 返回字符串 # 获取JSON变量(自动反序列化) config = Variable.get("job_config", deserialize_json=True) # 返回字典 # 带默认值访问 retries = Variable.get("max_retries", default_var=5) # 键不存在时返回5 </syntaxhighlight> == 高级用法 == === 变量继承 === Airflow支持通过<code>Variable.get()</code>的<code>default_var</code>参数实现变量继承: <mermaid> graph TD A[尝试获取生产环境变量] -->|存在| B[使用生产配置] A -->|不存在| C[使用开发默认值] </mermaid> 示例: <syntaxhighlight lang="python"> env = Variable.get("environment", "dev") # 默认为开发环境 db_url = Variable.get(f"{env}_database_url") </syntaxhighlight> === 变量与宏结合 === Variables可以在Jinja模板中通过宏访问: <syntaxhighlight lang="python"> task = BashOperator( task_id="process_data", bash_command="curl {{ var.value.api_endpoint }}", dag=dag ) </syntaxhighlight> == 性能考虑 == 频繁访问变量可能影响性能,因为: * 每次<code>Variable.get()</code>都会查询数据库 * 解决方案: ** 在DAG文件中缓存变量值 ** 使用<code>Variable.get()</code>的<code>deserialize_json=True</code>批量获取参数 最佳实践示例: <syntaxhighlight lang="python"> # 不推荐 - 多次查询 timeout = Variable.get("timeout") retries = Variable.get("retries") # 推荐 - 单次查询 config = Variable.get("task_params", deserialize_json=True) timeout = config["timeout"] retries = config["retries"] </syntaxhighlight> == 安全注意事项 == 1. '''敏感数据''':不应在变量中存储明文密码,应使用Airflow的'''Connections'''或外部密钥管理 2. '''访问控制''':通过Airflow RBAC限制变量访问权限 3. '''版本控制''':变量变更应记录在配置管理系统 == 实际案例 == === 场景:多环境配置管理 === <mermaid> graph LR DAG -->|读取| DEV_VAR[dev_config] DAG -->|读取| PROD_VAR[prod_config] DEV_VAR -.->|环境检测| DAG PROD_VAR -.->|环境检测| DAG </mermaid> 实现代码: <syntaxhighlight lang="python"> from airflow.models import Variable env = Variable.get("deployment_env", "dev") # 根据环境加载不同配置 if env == "prod": config = Variable.get("prod_database", deserialize_json=True) else: config = Variable.get("dev_database", deserialize_json=True) # 使用配置 db_url = config["url"] pool_size = config["pool_size"] </syntaxhighlight> === 场景:动态任务参数 === <syntaxhighlight lang="python"> thresholds = Variable.get("alert_thresholds", deserialize_json=True) email_alert = EmailOperator( task_id="alert", to="{{ var.value.alert_recipients }}", subject=f"值超过阈值 {thresholds['critical']}", ... ) </syntaxhighlight> == 数学表示 == 变量访问可以表示为函数: <math> V(k,d) = \begin{cases} v & \text{如果变量 } k \text{ 存在且值为 } v \\ d & \text{如果变量 } k \text{ 不存在} \end{cases} </math> 其中: * <math>k</math> 为变量键 * <math>d</math> 为默认值 * <math>v</math> 为变量值 == 总结 == Airflow Variables提供了灵活的配置管理方式,特别适合: * 环境特定的参数配置 * 需要频繁调整的运行时参数 * 跨DAG共享的公共配置 最佳实践组合: # 将静态配置存储在Variables中 # 敏感信息使用Connections管理 # 对复杂配置使用JSON序列化 # 为生产环境设置适当的访问控制 [[Category:大数据框架]] [[Category:Airflow]] [[Category:Airflow变量与连接]]
摘要:
请注意,所有对代码酷的贡献均被视为依照知识共享署名-非商业性使用-相同方式共享发表(详情请见
代码酷:著作权
)。如果您不希望您的文字作品被随意编辑和分发传播,请不要在此提交。
您同时也向我们承诺,您提交的内容为您自己所创作,或是复制自公共领域或类似自由来源。
未经许可,请勿提交受著作权保护的作品!
取消
编辑帮助
(在新窗口中打开)