跳转到内容

Airflow任务上下文

来自代码酷

Airflow任务上下文[编辑 | 编辑源代码]

介绍[编辑 | 编辑源代码]

Airflow任务上下文(Task Context)是Apache Airflow中任务运行时自动提供的环境变量和元数据集合,包含DAG运行状态、任务实例信息、执行时间等关键数据。通过上下文,任务可以动态感知其执行环境,实现跨任务通信(如XComs)、条件分支控制等高级功能。这是Airflow实现工作流自动化的核心机制之一。

核心组件[编辑 | 编辑源代码]

任务上下文通过Python字典形式传递,主要包含以下键值:

关键上下文变量
变量名 类型 描述
dag_run DagRun 当前DAG运行实例
task_instance TaskInstance 当前任务实例
execution_date datetime 逻辑执行时间
params dict DAG或任务级参数
ti TaskInstance task_instance的别名

访问上下文[编辑 | 编辑源代码]

在PythonOperator等可调用对象中,通过**kwargs自动注入上下文:

from airflow.operators.python import PythonOperator

def print_context(**context):
    print(f"Execution date: {context['execution_date']}")
    print(f"DAG ID: {context['dag'].dag_id}")
    print(f"Task ID: {context['task'].task_id}")

task = PythonOperator(
    task_id="print_context",
    python_callable=print_context,
    provide_context=True,  # Airflow 2.0+ 默认启用
)

输出示例:

Execution date: 2023-01-01 00:00:00+00:00
DAG ID: example_dag
Task ID: print_context

上下文的实际应用[编辑 | 编辑源代码]

动态分支选择[编辑 | 编辑源代码]

利用execution_date实现按时间分派任务:

def branch_func(**context):
    if context["execution_date"].hour < 12:
        return "morning_task"
    else:
        return "afternoon_task"

XCom跨任务通信[编辑 | 编辑源代码]

通过ti(TaskInstance)访问XComs:

def push_data(**context):
    context["ti"].xcom_push(key="sample", value=42)

def pull_data(**context):
    value = context["ti"].xcom_pull(key="sample")
    print(f"Received value: {value}")

高级用法[编辑 | 编辑源代码]

自定义上下文扩展[编辑 | 编辑源代码]

通过DAGdefault_args注入自定义变量:

default_args = {
    "owner": "admin",
    "start_date": days_ago(1),
    "custom_param": "EXTRA_DATA"  # 会出现在上下文中
}

时间计算模式[编辑 | 编辑源代码]

使用execution_date进行时间偏移计算(注意逻辑时间与实际运行时间的区别): 解析失败 (语法错误): {\displaystyle \text{数据窗口} = [\text{execution\_date}, \text{execution\_date} + \text{schedule\_interval}) }

可视化示例[编辑 | 编辑源代码]

graph LR A[Task A] -->|xcom_push| B[Task B] B -->|context['ti'].xcom_pull| C[Task C] style A fill:#f9f,stroke:#333 style B fill:#bbf,stroke:#333 style C fill:#9f9,stroke:#333

常见问题[编辑 | 编辑源代码]

Q: 为什么我的自定义参数未出现在上下文中? A: 确保参数通过default_args或Operator的op_kwargs传递,且任务设置了provide_context=True(Airflow 1.x)。

Q: 如何区分逻辑时间和实际运行时间? A: execution_date是逻辑调度时间,实际运行时间可通过context['ts']获取(格式为YYYY-MM-DDTHH:MM:SS+00:00)。

最佳实践[编辑 | 编辑源代码]

  • 优先使用**context而非显式参数列表,保持向前兼容
  • 对时间敏感操作始终基于execution_date而非当前系统时间
  • 在单元测试中通过airflow.utils.context.Context模拟上下文

参见[编辑 | 编辑源代码]