跳转到内容
主菜单
主菜单
移至侧栏
隐藏
导航
首页
最近更改
随机页面
MediaWiki帮助
代码酷
搜索
搜索
中文(中国大陆)
外观
创建账号
登录
个人工具
创建账号
登录
未登录编辑者的页面
了解详情
贡献
讨论
编辑“︁
Airflow
”︁(章节)
页面
讨论
大陆简体
阅读
编辑
编辑源代码
查看历史
工具
工具
移至侧栏
隐藏
操作
阅读
编辑
编辑源代码
查看历史
常规
链入页面
相关更改
特殊页面
页面信息
外观
移至侧栏
隐藏
您的更改会在有权核准的用户核准后向读者展示。
警告:
您没有登录。如果您进行任何编辑,您的IP地址会公开展示。如果您
登录
或
创建账号
,您的编辑会以您的用户名署名,此外还有其他益处。
反垃圾检查。
不要
加入这个!
= Airflow = '''Apache Airflow''' 是一个开源的工作流自动化和调度系统,由Airbnb创建并捐赠给Apache软件基金会。它使用Python编写,允许用户以编程方式创建、调度和监控复杂的工作流(称为DAGs,有向无环图)。Airflow特别适合处理ETL(提取、转换、加载)流程、机器学习流水线、数据仓库操作等需要复杂调度的场景。 == 核心概念 == === DAG(有向无环图) === Airflow的核心抽象是'''DAG'''(Directed Acyclic Graph),它表示一组任务及其依赖关系。DAG定义工作流的整体结构,但不关心单个任务的具体执行。 === 操作符(Operators) === 操作符定义单个任务的工作。Airflow提供多种内置操作符: * '''BashOperator''' - 执行bash命令 * '''PythonOperator''' - 调用Python函数 * '''EmailOperator''' - 发送电子邮件 * '''SimpleHttpOperator''' - 发送HTTP请求 === 任务(Tasks) === 任务是DAG中的基本执行单元,每个任务都是操作符的一个实例。 === 调度器(Scheduler) === 调度器负责触发任务执行,根据DAG定义和调度间隔决定何时运行任务。 == 安装与配置 == === 基本安装 === 使用pip安装Airflow: <syntaxhighlight lang="bash"> pip install apache-airflow </syntaxhighlight> === 初始化数据库 === Airflow需要一个元数据库来存储其状态: <syntaxhighlight lang="bash"> airflow db init </syntaxhighlight> === 启动Web服务器 === 启动Airflow的Web界面: <syntaxhighlight lang="bash"> airflow webserver --port 8080 </syntaxhighlight> === 启动调度器 === 启动调度器进程: <syntaxhighlight lang="bash"> airflow scheduler </syntaxhighlight> == 创建第一个DAG == 以下是一个简单的DAG示例,包含两个任务: <syntaxhighlight lang="python"> from datetime import datetime from airflow import DAG from airflow.operators.bash import BashOperator from airflow.operators.python import PythonOperator def print_hello(): print("Hello from Airflow!") with DAG( 'my_first_dag', start_date=datetime(2023, 1, 1), schedule_interval='@daily' ) as dag: task1 = BashOperator( task_id='print_date', bash_command='date' ) task2 = PythonOperator( task_id='print_hello', python_callable=print_hello ) task1 >> task2 # 设置任务依赖关系 </syntaxhighlight> == 高级特性 == === 钩子(Hooks) === 钩子是Airflow与外部系统交互的接口,如数据库、云服务等。例如: * '''PostgresHook''' - 连接PostgreSQL数据库 * '''HttpHook''' - 发送HTTP请求 * '''S3Hook''' - 与AWS S3交互 === 传感器(Sensors) === 传感器是一种特殊类型的操作符,它会等待某些条件为真才执行。例如: * '''FileSensor''' - 等待文件出现 * '''SqlSensor''' - 等待SQL查询返回特定结果 === XComs === XComs(Cross-Communication)允许任务之间交换小量数据。 == 实际应用案例 == === 数据管道 === Airflow常用于构建数据管道,从多个源提取数据,进行转换后加载到目标系统。 === 机器学习流水线 === 机器学习模型训练和部署流程可以建模为DAG,包含数据准备、特征工程、模型训练和评估等步骤。 === 基础设施管理 === Airflow可以调度基础设施相关的任务,如备份、日志轮转、系统健康检查等。 == 与其他技术的比较 == {| class="wikitable" |- ! 工具 !! 主要特点 !! 适用场景 |- | [[Airflow]] || 基于Python,编程式定义工作流 || 复杂调度,数据工程 |- | [[Jenkins]] || 基于UI配置,CI/CD专注 || 软件构建和部署 |- | [[Luigi]] || 轻量级,Python编写 || 简单数据管道 |- | [[Oozie]] || Hadoop生态系统集成 || Hadoop工作流 |} == 最佳实践 == * 保持DAG文件简洁,将复杂逻辑封装在单独模块中 * 使用有意义的任务ID和DAG名称 * 合理设置重试策略和超时 * 避免在DAG文件中存储敏感信息,使用Airflow的Connections和Variables * 为DAG添加文档字符串,解释其目的和结构 == 常见问题 == === 如何处理任务失败? === Airflow提供多种机制处理失败: * 自动重试(retries参数) * 警报通知(如EmailOperator) * 自定义回调函数 === 如何扩展Airflow? === 可以通过以下方式扩展: * 编写自定义操作符 * 创建插件 * 使用Executor扩展执行能力 == 社区与资源 == * 官方文档: https://airflow.apache.org/ * GitHub仓库: https://github.com/apache/airflow * Slack社区: https://apache-airflow.slack.com/ [[Category:数据科学与人工智能]] [[Category:工作流管理系统]] [[Category:开源软件]]
摘要:
请注意,所有对代码酷的贡献均被视为依照知识共享署名-非商业性使用-相同方式共享发表(详情请见
代码酷:著作权
)。如果您不希望您的文字作品被随意编辑和分发传播,请不要在此提交。
您同时也向我们承诺,您提交的内容为您自己所创作,或是复制自公共领域或类似自由来源。
未经许可,请勿提交受著作权保护的作品!
取消
编辑帮助
(在新窗口中打开)