跳转到内容
主菜单
主菜单
移至侧栏
隐藏
导航
首页
最近更改
随机页面
MediaWiki帮助
代码酷
搜索
搜索
中文(中国大陆)
外观
创建账号
登录
个人工具
创建账号
登录
未登录编辑者的页面
了解详情
贡献
讨论
编辑“︁
Airflow TaskFlow API
”︁(章节)
页面
讨论
大陆简体
阅读
编辑
编辑源代码
查看历史
工具
工具
移至侧栏
隐藏
操作
阅读
编辑
编辑源代码
查看历史
常规
链入页面
相关更改
特殊页面
页面信息
外观
移至侧栏
隐藏
您的更改会在有权核准的用户核准后向读者展示。
警告:
您没有登录。如果您进行任何编辑,您的IP地址会公开展示。如果您
登录
或
创建账号
,您的编辑会以您的用户名署名,此外还有其他益处。
反垃圾检查。
不要
加入这个!
= Airflow TaskFlow API = == 介绍 == '''Airflow TaskFlow API''' 是 Apache Airflow 2.0 引入的一种高级编程接口,旨在简化 DAG(有向无环图)的定义和任务之间的数据传递。它基于 Python 装饰器(`@task`),允许用户以更直观、函数式的方式编写任务逻辑,并自动处理 XComs(跨任务通信)的推送和拉取。TaskFlow API 特别适合数据管道开发,减少了传统 Operator 的样板代码,同时保持了 Airflow 的灵活性和可扩展性。 TaskFlow API 的核心优势包括: * 减少显式 XComs 操作 * 更自然的函数式编程风格 * 自动依赖管理 * 与 Python 类型提示(Type Hints)集成 == 基本用法 == 以下是一个简单的 TaskFlow API 示例,展示如何定义任务并传递数据: <syntaxhighlight lang="python"> from airflow.decorators import dag, task from datetime import datetime @dag(schedule_interval=None, start_date=datetime(2023, 1, 1), catchup=False) def taskflow_example(): @task def extract() -> dict: return {"data": [1, 2, 3]} @task def transform(data: dict) -> dict: data["data"] = [x * 2 for x in data["data"]] return data @task def load(data: dict): print(f"Processed data: {data}") load(transform(extract())) dag = taskflow_example() </syntaxhighlight> '''代码解析:''' 1. 使用 `@dag` 装饰器定义 DAG 2. 每个任务函数用 `@task` 装饰器标记 3. 函数的返回值自动通过 XComs 传递 4. 任务依赖关系通过函数调用链 `load(transform(extract()))` 隐式定义 == XComs 的自动处理 == 在传统 Airflow 中,XComs 需要显式调用 `xcom_push` 和 `xcom_pull`。TaskFlow API 自动完成这些操作: * 函数返回值自动推送到 XComs * 下游任务通过函数参数自动拉取 XComs 值 * 类型提示(如 `data: dict`)帮助 Airflow 验证数据类型 === 多返回值处理 === TaskFlow API 支持返回多个值(通过元组或字典),下游任务可以按需接收: <syntaxhighlight lang="python"> @task def extract_multiple() -> tuple: return [1, 2, 3], {"status": "ok"} @task def process(numbers: list, metadata: dict): print(f"Numbers: {numbers}, Metadata: {metadata}") process(*extract_multiple()) </syntaxhighlight> == 高级特性 == === 任务依赖的显式控制 === 除函数调用外,可以使用 `>>` 运算符定义依赖: <syntaxhighlight lang="python"> extract_task = extract() transform_task = transform(extract_task) load_task = load(transform_task) extract_task >> transform_task >> load_task </syntaxhighlight> === 与传统 Operator 混合使用 === TaskFlow 任务可以与标准 Operator 共存: <syntaxhighlight lang="python"> from airflow.operators.python import PythonOperator @task def process_data(data): return data * 2 def traditional_task(**context): print("Traditional operator running") with DAG(...) as dag: taskflow_result = process_data(42) PythonOperator( task_id="traditional", python_callable=traditional_task, op_kwargs={"input": taskflow_result} ) </syntaxhighlight> === 动态任务映射 === Airflow 2.3+ 支持动态生成并行任务: <syntaxhighlight lang="python"> @task def process_item(item): return item * 2 @task def extract_items() -> list: return [1, 2, 3, 4] process_item.expand(item=extract_items()) </syntaxhighlight> == 实际案例:数据ETL管道 == 以下是一个完整的数据处理管道示例: <syntaxhighlight lang="python"> from airflow.decorators import dag, task from datetime import datetime import pandas as pd @dag(schedule="@daily", start_date=datetime(2023, 1, 1)) def sales_pipeline(): @task(retries=3) def fetch_sales(date: datetime) -> pd.DataFrame: # 模拟从数据库获取数据 return pd.DataFrame({ "date": [date.date()], "revenue": [1000 * (date.day % 7 + 1)] }) @task def calculate_stats(df: pd.DataFrame) -> dict: return { "total": df["revenue"].sum(), "average": df["revenue"].mean() } @task def store_report(stats: dict): print(f"Storing report: {stats}") sales_data = fetch_sales("{{ ds }}") stats = calculate_stats(sales_data) store_report(stats) dag = sales_pipeline() </syntaxhighlight> '''关键点:''' * 使用 Pandas DataFrame 作为数据类型 * 任务自动重试机制(`retries=3`) * 模板变量 `{{ ds }}` 的用法 * 完整的类型安全数据流 == 性能考量 == 使用 TaskFlow API 时需注意: 1. '''XComs 大小限制''':默认序列化为 JSON,大型数据应使用外部存储(如 S3) 2. '''序列化开销''':复杂对象可能需要自定义序列化 3. '''任务映射内存消耗''':动态生成大量任务时需监控资源 == 可视化 == 以下 Mermaid 图展示示例 DAG 的任务流: <mermaid> graph LR A[extract] --> B[transform] B --> C[load] </mermaid> == 数学表达 == 对于数据转换任务,可以形式化描述为: <math> f_{transform}(x) = g \circ f_{extract}(x) </math> 其中: * <math>f_{extract}</math> 是提取任务 * <math>g</math> 是转换逻辑 * <math>\circ</math> 表示函数组合 == 总结 == TaskFlow API 显著提升了 Airflow 的开发体验: * 减少约 40% 的样板代码(基于典型 ETL 管道) * 更符合 Python 开发者的直觉 * 保持与传统 Operator 的兼容性 * 通过类型提示提高代码可靠性 建议新项目优先采用 TaskFlow API,现有项目可以逐步迁移。对于复杂场景,可结合传统 Operator 实现最佳灵活性。 [[Category:大数据框架]] [[Category:Airflow]] [[Category:Airflow XComs与任务通信]]
摘要:
请注意,所有对代码酷的贡献均被视为依照知识共享署名-非商业性使用-相同方式共享发表(详情请见
代码酷:著作权
)。如果您不希望您的文字作品被随意编辑和分发传播,请不要在此提交。
您同时也向我们承诺,您提交的内容为您自己所创作,或是复制自公共领域或类似自由来源。
未经许可,请勿提交受著作权保护的作品!
取消
编辑帮助
(在新窗口中打开)