跳转到内容

Airflow PythonOperator

来自代码酷

Airflow PythonOperator[编辑 | 编辑源代码]

概述[编辑 | 编辑源代码]

PythonOperator 是 Apache Airflow 中最常用的操作符之一,允许用户执行任意的 Python 函数作为任务(Task)。它是 DAG(有向无环图)开发的核心组件之一,特别适合需要在工作流中运行 Python 代码的场景。PythonOperator 通过调用用户定义的 Python 函数来完成任务,并支持参数传递、依赖管理和上下文访问。

基本语法[编辑 | 编辑源代码]

PythonOperator 的基本语法如下:

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def my_python_function(**kwargs):
    print("Hello from PythonOperator!")
    return "Task executed successfully"

with DAG(
    dag_id="python_operator_example",
    start_date=datetime(2023, 1, 1),
    schedule_interval="@daily",
) as dag:
    task = PythonOperator(
        task_id="python_task",
        python_callable=my_python_function,
        provide_context=True,  # 允许传递上下文(如 `**kwargs`)
    )

参数说明[编辑 | 编辑源代码]

  • task_id:任务的唯一标识符。
  • python_callable:要执行的 Python 函数。
  • op_kwargs(可选):传递给 `python_callable` 的额外参数(字典形式)。
  • provide_context(可选):如果为 `True`,Airflow 会将任务上下文(如 `execution_date`)作为 `**kwargs` 传递给函数。

实际案例[编辑 | 编辑源代码]

案例 1:简单任务[编辑 | 编辑源代码]

以下示例展示如何使用 PythonOperator 打印当前日期:

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def print_date(**kwargs):
    execution_date = kwargs.get("execution_date")
    print(f"Execution date: {execution_date}")
    return "Date printed"

with DAG(
    dag_id="print_date_dag",
    start_date=datetime(2023, 1, 1),
    schedule_interval="@daily",
) as dag:
    print_task = PythonOperator(
        task_id="print_date_task",
        python_callable=print_date,
        provide_context=True,
    )

输出:

Execution date: 2023-01-01T00:00:00+00:00

案例 2:数据处理任务[编辑 | 编辑源代码]

PythonOperator 可用于执行数据转换或分析任务。以下示例计算两个数的和:

def calculate_sum(a, b, **kwargs):
    result = a + b
    print(f"Sum of {a} and {b} is {result}")
    return result

with DAG(
    dag_id="calculate_sum_dag",
    start_date=datetime(2023, 1, 1),
) as dag:
    sum_task = PythonOperator(
        task_id="calculate_sum_task",
        python_callable=calculate_sum,
        op_kwargs={"a": 5, "b": 3},
    )

输出:

Sum of 5 and 3 is 8

高级用法[编辑 | 编辑源代码]

访问 Airflow 上下文[编辑 | 编辑源代码]

PythonOperator 可以通过 `provide_context=True` 访问 Airflow 的上下文变量,如 `execution_date`、`dag_run` 等:

def log_context(**kwargs):
    ti = kwargs.get("ti")  # TaskInstance 对象
    dag_run = kwargs.get("dag_run")  # DagRun 对象
    print(f"Task ID: {ti.task_id}")
    print(f"DAG Run ID: {dag_run.run_id}")

with DAG(...) as dag:
    context_task = PythonOperator(
        task_id="log_context_task",
        python_callable=log_context,
        provide_context=True,
    )

返回值与 XCom[编辑 | 编辑源代码]

PythonOperator 的返回值会自动推送到 Airflow 的 XCom(跨任务通信)系统,供后续任务使用:

def generate_data(**kwargs):
    return {"data": [1, 2, 3]}

def process_data(**kwargs):
    ti = kwargs.get("ti")
    data = ti.xcom_pull(task_ids="generate_data_task")
    print(f"Processing data: {data}")

with DAG(...) as dag:
    generate_task = PythonOperator(
        task_id="generate_data_task",
        python_callable=generate_data,
    )
    process_task = PythonOperator(
        task_id="process_data_task",
        python_callable=process_data,
        provide_context=True,
    )
    generate_task >> process_task

输出:

Processing data: {'data': [1, 2, 3]}

注意事项[编辑 | 编辑源代码]

  • PythonOperator 适用于轻量级任务,长时间运行的任务应考虑使用其他操作符(如 `BashOperator` 或 `KubernetesPodOperator`)。
  • 确保 Python 函数是幂等的(多次执行不会产生副作用)。
  • 避免在函数中使用全局变量,以防止并发问题。

总结[编辑 | 编辑源代码]

PythonOperator 是 Airflow DAG 开发的核心工具,适用于执行 Python 函数任务。它支持参数传递、上下文访问和 XCom 通信,适合数据处理、API 调用等场景。通过合理使用 PythonOperator,可以构建灵活且可维护的工作流。

参见[编辑 | 编辑源代码]