Airflow PythonOperator
外观
Airflow PythonOperator[编辑 | 编辑源代码]
概述[编辑 | 编辑源代码]
PythonOperator 是 Apache Airflow 中最常用的操作符之一,允许用户执行任意的 Python 函数作为任务(Task)。它是 DAG(有向无环图)开发的核心组件之一,特别适合需要在工作流中运行 Python 代码的场景。PythonOperator 通过调用用户定义的 Python 函数来完成任务,并支持参数传递、依赖管理和上下文访问。
基本语法[编辑 | 编辑源代码]
PythonOperator 的基本语法如下:
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def my_python_function(**kwargs):
print("Hello from PythonOperator!")
return "Task executed successfully"
with DAG(
dag_id="python_operator_example",
start_date=datetime(2023, 1, 1),
schedule_interval="@daily",
) as dag:
task = PythonOperator(
task_id="python_task",
python_callable=my_python_function,
provide_context=True, # 允许传递上下文(如 `**kwargs`)
)
参数说明[编辑 | 编辑源代码]
- task_id:任务的唯一标识符。
- python_callable:要执行的 Python 函数。
- op_kwargs(可选):传递给 `python_callable` 的额外参数(字典形式)。
- provide_context(可选):如果为 `True`,Airflow 会将任务上下文(如 `execution_date`)作为 `**kwargs` 传递给函数。
实际案例[编辑 | 编辑源代码]
案例 1:简单任务[编辑 | 编辑源代码]
以下示例展示如何使用 PythonOperator 打印当前日期:
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def print_date(**kwargs):
execution_date = kwargs.get("execution_date")
print(f"Execution date: {execution_date}")
return "Date printed"
with DAG(
dag_id="print_date_dag",
start_date=datetime(2023, 1, 1),
schedule_interval="@daily",
) as dag:
print_task = PythonOperator(
task_id="print_date_task",
python_callable=print_date,
provide_context=True,
)
输出:
Execution date: 2023-01-01T00:00:00+00:00
案例 2:数据处理任务[编辑 | 编辑源代码]
PythonOperator 可用于执行数据转换或分析任务。以下示例计算两个数的和:
def calculate_sum(a, b, **kwargs):
result = a + b
print(f"Sum of {a} and {b} is {result}")
return result
with DAG(
dag_id="calculate_sum_dag",
start_date=datetime(2023, 1, 1),
) as dag:
sum_task = PythonOperator(
task_id="calculate_sum_task",
python_callable=calculate_sum,
op_kwargs={"a": 5, "b": 3},
)
输出:
Sum of 5 and 3 is 8
高级用法[编辑 | 编辑源代码]
访问 Airflow 上下文[编辑 | 编辑源代码]
PythonOperator 可以通过 `provide_context=True` 访问 Airflow 的上下文变量,如 `execution_date`、`dag_run` 等:
def log_context(**kwargs):
ti = kwargs.get("ti") # TaskInstance 对象
dag_run = kwargs.get("dag_run") # DagRun 对象
print(f"Task ID: {ti.task_id}")
print(f"DAG Run ID: {dag_run.run_id}")
with DAG(...) as dag:
context_task = PythonOperator(
task_id="log_context_task",
python_callable=log_context,
provide_context=True,
)
返回值与 XCom[编辑 | 编辑源代码]
PythonOperator 的返回值会自动推送到 Airflow 的 XCom(跨任务通信)系统,供后续任务使用:
def generate_data(**kwargs):
return {"data": [1, 2, 3]}
def process_data(**kwargs):
ti = kwargs.get("ti")
data = ti.xcom_pull(task_ids="generate_data_task")
print(f"Processing data: {data}")
with DAG(...) as dag:
generate_task = PythonOperator(
task_id="generate_data_task",
python_callable=generate_data,
)
process_task = PythonOperator(
task_id="process_data_task",
python_callable=process_data,
provide_context=True,
)
generate_task >> process_task
输出:
Processing data: {'data': [1, 2, 3]}
注意事项[编辑 | 编辑源代码]
- PythonOperator 适用于轻量级任务,长时间运行的任务应考虑使用其他操作符(如 `BashOperator` 或 `KubernetesPodOperator`)。
- 确保 Python 函数是幂等的(多次执行不会产生副作用)。
- 避免在函数中使用全局变量,以防止并发问题。
总结[编辑 | 编辑源代码]
PythonOperator 是 Airflow DAG 开发的核心工具,适用于执行 Python 函数任务。它支持参数传递、上下文访问和 XCom 通信,适合数据处理、API 调用等场景。通过合理使用 PythonOperator,可以构建灵活且可维护的工作流。