跳转到内容
主菜单
主菜单
移至侧栏
隐藏
导航
首页
最近更改
随机页面
MediaWiki帮助
代码酷
搜索
搜索
中文(中国大陆)
外观
创建账号
登录
个人工具
创建账号
登录
未登录编辑者的页面
了解详情
贡献
讨论
编辑“︁
Airflow工作流管理
”︁(章节)
页面
讨论
大陆简体
阅读
编辑
编辑源代码
查看历史
工具
工具
移至侧栏
隐藏
操作
阅读
编辑
编辑源代码
查看历史
常规
链入页面
相关更改
特殊页面
页面信息
外观
移至侧栏
隐藏
您的更改会在有权核准的用户核准后向读者展示。
警告:
您没有登录。如果您进行任何编辑,您的IP地址会公开展示。如果您
登录
或
创建账号
,您的编辑会以您的用户名署名,此外还有其他益处。
反垃圾检查。
不要
加入这个!
= Airflow工作流管理 = '''Apache Airflow''' 是一个开源的工作流自动化和管理平台,用于编程式地编排、调度和监控复杂的数据管道。它由Airbnb开发并于2016年开源,现已成为Hadoop生态中任务调度的核心工具之一。Airflow使用Python编写,通过'''有向无环图(DAG)'''定义任务依赖关系,支持动态生成管道、任务重试、日志追踪等功能。 == 核心概念 == === 1. DAG(有向无环图) === Airflow的核心抽象是DAG,表示一组具有明确依赖关系的任务。例如: <mermaid> graph LR A[数据提取] --> B[数据清洗] B --> C[数据分析] C --> D[报告生成] </mermaid> === 2. Operator === 定义单个任务的执行逻辑,常用类型包括: * '''BashOperator''':执行Shell命令 * '''PythonOperator''':调用Python函数 * '''HiveOperator''':运行Hive查询 === 3. Task & Task Instance === * '''Task''':DAG中的一个节点 * '''Task Instance''':任务的一次具体运行(含执行时间等上下文) == 安装与基础示例 == === 安装 === 通过pip安装: <syntaxhighlight lang="bash"> pip install apache-airflow airflow standalone # 快速启动本地实例 </syntaxhighlight> === 第一个DAG示例 === 创建<code>~/airflow/dags/hello_world.py</code>: <syntaxhighlight lang="python"> from airflow import DAG from airflow.operators.bash import BashOperator from datetime import datetime with DAG( dag_id="hello_world", start_date=datetime(2023, 1, 1), schedule_interval="@daily" ) as dag: task1 = BashOperator( task_id="print_date", bash_command="date" ) task2 = BashOperator( task_id="echo", bash_command="echo 'Hello Airflow!'" ) task1 >> task2 # 定义依赖关系 </syntaxhighlight> '''输出示例'''(通过<code>airflow tasks list hello_world</code>查看): <pre> print_date echo </pre> == 高级特性 == === 动态DAG生成 === 使用Python代码动态创建DAG: <syntaxhighlight lang="python"> for i in range(3): dag_id = f'dynamic_dag_{i}' with DAG(dag_id, schedule_interval='@hourly') as dag: BashOperator( task_id=f'task_{i}', bash_command=f'echo "Processing batch {i}"' ) </syntaxhighlight> === 任务依赖控制 === 复杂依赖关系可通过位运算符实现: <syntaxhighlight lang="python"> task1 >> [task2, task3] # task1先于task2和task3执行 task3 << task4 # task4先于task3执行 </syntaxhighlight> == 实战案例:电商数据处理 == === 场景描述 === 每天处理订单数据: 1. 从HDFS提取数据 2. 用Hive清洗数据 3. 用Spark计算指标 4. 将结果写入MySQL === 实现代码 === <syntaxhighlight lang="python"> from airflow.providers.apache.hive.operators.hive import HiveOperator from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator with DAG("ecommerce_etl", schedule_interval="@daily") as dag: extract = BashOperator( task_id="extract_from_hdfs", bash_command="hdfs dfs -get /raw/orders_{{ ds }}.json /tmp/" ) transform = HiveOperator( task_id="clean_data", hql="LOAD DATA LOCAL INPATH '/tmp/orders_{{ ds }}.json' INTO TABLE orders" ) analyze = SparkSubmitOperator( task_id="calculate_metrics", application="/jobs/sales_analysis.py", application_args=["{{ ds }}"] ) extract >> transform >> analyze </syntaxhighlight> == 监控与调优 == === Web界面 === 访问<code>http://localhost:8080</code>可查看: * DAG运行状态 * 任务持续时间统计 * 日志详情 === 性能调优 === * 使用<code>executor_config</code>分配资源: <syntaxhighlight lang="python"> PythonOperator( task_id="heavy_task", python_callable=process_data, executor_config={"KubernetesExecutor": {"request_memory": "512Mi"}} ) </syntaxhighlight> * 通过<code>pool</code>控制并行任务数 == 数学基础 == Airflow调度器使用贪心算法分配任务,其时间复杂度为: <math>O(n + m)</math> 其中: * <math>n</math> = 待调度任务数 * <math>m</math> = 依赖关系边数 == 常见问题 == {| class="wikitable" |- ! 问题 !! 解决方案 |- | DAG未显示 || 检查<code>airflow.cfg</code>中<code>dags_folder</code>路径 |- | 任务卡住 || 检查<code>airflow scheduler</code>是否运行 |- | 依赖冲突 || 使用<code>pip check</code>验证包兼容性 |} == 总结 == Airflow通过代码定义工作流的方式,完美解决了Hadoop生态中复杂任务调度的问题。其核心优势包括: * '''可视化监控''':内置Web UI * '''可扩展性''':支持自定义Operator * '''可重复性''':任务失败自动重试 建议初学者从官方示例DAG开始,逐步构建真实场景的管道。 [[Category:大数据框架]] [[Category:Apache Hadoop]] [[Category:Apache Hadoop生态工具]]
摘要:
请注意,所有对代码酷的贡献均被视为依照知识共享署名-非商业性使用-相同方式共享发表(详情请见
代码酷:著作权
)。如果您不希望您的文字作品被随意编辑和分发传播,请不要在此提交。
您同时也向我们承诺,您提交的内容为您自己所创作,或是复制自公共领域或类似自由来源。
未经许可,请勿提交受著作权保护的作品!
取消
编辑帮助
(在新窗口中打开)