跳转到内容
主菜单
主菜单
移至侧栏
隐藏
导航
首页
最近更改
随机页面
MediaWiki帮助
代码酷
搜索
搜索
中文(中国大陆)
外观
创建账号
登录
个人工具
创建账号
登录
未登录编辑者的页面
了解详情
贡献
讨论
编辑“︁
Airflow PythonOperator
”︁(章节)
页面
讨论
大陆简体
阅读
编辑
编辑源代码
查看历史
工具
工具
移至侧栏
隐藏
操作
阅读
编辑
编辑源代码
查看历史
常规
链入页面
相关更改
特殊页面
页面信息
外观
移至侧栏
隐藏
您的更改会在有权核准的用户核准后向读者展示。
警告:
您没有登录。如果您进行任何编辑,您的IP地址会公开展示。如果您
登录
或
创建账号
,您的编辑会以您的用户名署名,此外还有其他益处。
反垃圾检查。
不要
加入这个!
= Airflow PythonOperator = == 概述 == '''PythonOperator''' 是 Apache Airflow 中最常用的操作符之一,允许用户执行任意的 Python 函数作为任务(Task)。它是 DAG(有向无环图)开发的核心组件之一,特别适合需要在工作流中运行 Python 代码的场景。PythonOperator 通过调用用户定义的 Python 函数来完成任务,并支持参数传递、依赖管理和上下文访问。 == 基本语法 == PythonOperator 的基本语法如下: <syntaxhighlight lang="python"> from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime def my_python_function(**kwargs): print("Hello from PythonOperator!") return "Task executed successfully" with DAG( dag_id="python_operator_example", start_date=datetime(2023, 1, 1), schedule_interval="@daily", ) as dag: task = PythonOperator( task_id="python_task", python_callable=my_python_function, provide_context=True, # 允许传递上下文(如 `**kwargs`) ) </syntaxhighlight> === 参数说明 === * '''task_id''':任务的唯一标识符。 * '''python_callable''':要执行的 Python 函数。 * '''op_kwargs'''(可选):传递给 `python_callable` 的额外参数(字典形式)。 * '''provide_context'''(可选):如果为 `True`,Airflow 会将任务上下文(如 `execution_date`)作为 `**kwargs` 传递给函数。 == 实际案例 == === 案例 1:简单任务 === 以下示例展示如何使用 PythonOperator 打印当前日期: <syntaxhighlight lang="python"> from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime def print_date(**kwargs): execution_date = kwargs.get("execution_date") print(f"Execution date: {execution_date}") return "Date printed" with DAG( dag_id="print_date_dag", start_date=datetime(2023, 1, 1), schedule_interval="@daily", ) as dag: print_task = PythonOperator( task_id="print_date_task", python_callable=print_date, provide_context=True, ) </syntaxhighlight> '''输出:''' <pre> Execution date: 2023-01-01T00:00:00+00:00 </pre> === 案例 2:数据处理任务 === PythonOperator 可用于执行数据转换或分析任务。以下示例计算两个数的和: <syntaxhighlight lang="python"> def calculate_sum(a, b, **kwargs): result = a + b print(f"Sum of {a} and {b} is {result}") return result with DAG( dag_id="calculate_sum_dag", start_date=datetime(2023, 1, 1), ) as dag: sum_task = PythonOperator( task_id="calculate_sum_task", python_callable=calculate_sum, op_kwargs={"a": 5, "b": 3}, ) </syntaxhighlight> '''输出:''' <pre> Sum of 5 and 3 is 8 </pre> == 高级用法 == === 访问 Airflow 上下文 === PythonOperator 可以通过 `provide_context=True` 访问 Airflow 的上下文变量,如 `execution_date`、`dag_run` 等: <syntaxhighlight lang="python"> def log_context(**kwargs): ti = kwargs.get("ti") # TaskInstance 对象 dag_run = kwargs.get("dag_run") # DagRun 对象 print(f"Task ID: {ti.task_id}") print(f"DAG Run ID: {dag_run.run_id}") with DAG(...) as dag: context_task = PythonOperator( task_id="log_context_task", python_callable=log_context, provide_context=True, ) </syntaxhighlight> === 返回值与 XCom === PythonOperator 的返回值会自动推送到 Airflow 的 XCom(跨任务通信)系统,供后续任务使用: <syntaxhighlight lang="python"> def generate_data(**kwargs): return {"data": [1, 2, 3]} def process_data(**kwargs): ti = kwargs.get("ti") data = ti.xcom_pull(task_ids="generate_data_task") print(f"Processing data: {data}") with DAG(...) as dag: generate_task = PythonOperator( task_id="generate_data_task", python_callable=generate_data, ) process_task = PythonOperator( task_id="process_data_task", python_callable=process_data, provide_context=True, ) generate_task >> process_task </syntaxhighlight> '''输出:''' <pre> Processing data: {'data': [1, 2, 3]} </pre> == 注意事项 == * PythonOperator 适用于轻量级任务,长时间运行的任务应考虑使用其他操作符(如 `BashOperator` 或 `KubernetesPodOperator`)。 * 确保 Python 函数是幂等的(多次执行不会产生副作用)。 * 避免在函数中使用全局变量,以防止并发问题。 == 总结 == PythonOperator 是 Airflow DAG 开发的核心工具,适用于执行 Python 函数任务。它支持参数传递、上下文访问和 XCom 通信,适合数据处理、API 调用等场景。通过合理使用 PythonOperator,可以构建灵活且可维护的工作流。 == 参见 == * [[Airflow DAG 开发]] * [[Airflow XCom 机制]] * [[Airflow 任务依赖管理]] [[Category:大数据框架]] [[Category:Airflow]] [[Category:Airflow DAG开发]]
摘要:
请注意,所有对代码酷的贡献均被视为依照知识共享署名-非商业性使用-相同方式共享发表(详情请见
代码酷:著作权
)。如果您不希望您的文字作品被随意编辑和分发传播,请不要在此提交。
您同时也向我们承诺,您提交的内容为您自己所创作,或是复制自公共领域或类似自由来源。
未经许可,请勿提交受著作权保护的作品!
取消
编辑帮助
(在新窗口中打开)