Airflow任务上下文
外观
Airflow任务上下文[编辑 | 编辑源代码]
介绍[编辑 | 编辑源代码]
Airflow任务上下文(Task Context)是Apache Airflow中任务运行时自动提供的环境变量和元数据集合,包含DAG运行状态、任务实例信息、执行时间等关键数据。通过上下文,任务可以动态感知其执行环境,实现跨任务通信(如XComs)、条件分支控制等高级功能。这是Airflow实现工作流自动化的核心机制之一。
核心组件[编辑 | 编辑源代码]
任务上下文通过Python字典形式传递,主要包含以下键值:
变量名 | 类型 | 描述 |
---|---|---|
dag_run |
DagRun | 当前DAG运行实例 |
task_instance |
TaskInstance | 当前任务实例 |
execution_date |
datetime | 逻辑执行时间 |
params |
dict | DAG或任务级参数 |
ti |
TaskInstance | task_instance 的别名
|
访问上下文[编辑 | 编辑源代码]
在PythonOperator等可调用对象中,通过**kwargs
自动注入上下文:
from airflow.operators.python import PythonOperator
def print_context(**context):
print(f"Execution date: {context['execution_date']}")
print(f"DAG ID: {context['dag'].dag_id}")
print(f"Task ID: {context['task'].task_id}")
task = PythonOperator(
task_id="print_context",
python_callable=print_context,
provide_context=True, # Airflow 2.0+ 默认启用
)
输出示例:
Execution date: 2023-01-01 00:00:00+00:00 DAG ID: example_dag Task ID: print_context
上下文的实际应用[编辑 | 编辑源代码]
动态分支选择[编辑 | 编辑源代码]
利用execution_date
实现按时间分派任务:
def branch_func(**context):
if context["execution_date"].hour < 12:
return "morning_task"
else:
return "afternoon_task"
XCom跨任务通信[编辑 | 编辑源代码]
通过ti
(TaskInstance)访问XComs:
def push_data(**context):
context["ti"].xcom_push(key="sample", value=42)
def pull_data(**context):
value = context["ti"].xcom_pull(key="sample")
print(f"Received value: {value}")
高级用法[编辑 | 编辑源代码]
自定义上下文扩展[编辑 | 编辑源代码]
通过DAG
的default_args
注入自定义变量:
default_args = {
"owner": "admin",
"start_date": days_ago(1),
"custom_param": "EXTRA_DATA" # 会出现在上下文中
}
时间计算模式[编辑 | 编辑源代码]
使用execution_date
进行时间偏移计算(注意逻辑时间与实际运行时间的区别):
解析失败 (语法错误): {\displaystyle \text{数据窗口} = [\text{execution\_date}, \text{execution\_date} + \text{schedule\_interval}) }
可视化示例[编辑 | 编辑源代码]
常见问题[编辑 | 编辑源代码]
Q: 为什么我的自定义参数未出现在上下文中?
A: 确保参数通过default_args
或Operator的op_kwargs
传递,且任务设置了provide_context=True
(Airflow 1.x)。
Q: 如何区分逻辑时间和实际运行时间?
A: execution_date
是逻辑调度时间,实际运行时间可通过context['ts']
获取(格式为YYYY-MM-DDTHH:MM:SS+00:00)。
最佳实践[编辑 | 编辑源代码]
- 优先使用
**context
而非显式参数列表,保持向前兼容 - 对时间敏感操作始终基于
execution_date
而非当前系统时间 - 在单元测试中通过
airflow.utils.context.Context
模拟上下文