Airflow工作流管理
外观
Airflow工作流管理[编辑 | 编辑源代码]
Apache Airflow 是一个开源的工作流自动化和管理平台,用于编程式地编排、调度和监控复杂的数据管道。它由Airbnb开发并于2016年开源,现已成为Hadoop生态中任务调度的核心工具之一。Airflow使用Python编写,通过有向无环图(DAG)定义任务依赖关系,支持动态生成管道、任务重试、日志追踪等功能。
核心概念[编辑 | 编辑源代码]
1. DAG(有向无环图)[编辑 | 编辑源代码]
Airflow的核心抽象是DAG,表示一组具有明确依赖关系的任务。例如:
2. Operator[编辑 | 编辑源代码]
定义单个任务的执行逻辑,常用类型包括:
- BashOperator:执行Shell命令
- PythonOperator:调用Python函数
- HiveOperator:运行Hive查询
3. Task & Task Instance[编辑 | 编辑源代码]
- Task:DAG中的一个节点
- Task Instance:任务的一次具体运行(含执行时间等上下文)
安装与基础示例[编辑 | 编辑源代码]
安装[编辑 | 编辑源代码]
通过pip安装:
pip install apache-airflow
airflow standalone # 快速启动本地实例
第一个DAG示例[编辑 | 编辑源代码]
创建~/airflow/dags/hello_world.py
:
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime
with DAG(
dag_id="hello_world",
start_date=datetime(2023, 1, 1),
schedule_interval="@daily"
) as dag:
task1 = BashOperator(
task_id="print_date",
bash_command="date"
)
task2 = BashOperator(
task_id="echo",
bash_command="echo 'Hello Airflow!'"
)
task1 >> task2 # 定义依赖关系
输出示例(通过airflow tasks list hello_world
查看):
print_date echo
高级特性[编辑 | 编辑源代码]
动态DAG生成[编辑 | 编辑源代码]
使用Python代码动态创建DAG:
for i in range(3):
dag_id = f'dynamic_dag_{i}'
with DAG(dag_id, schedule_interval='@hourly') as dag:
BashOperator(
task_id=f'task_{i}',
bash_command=f'echo "Processing batch {i}"'
)
任务依赖控制[编辑 | 编辑源代码]
复杂依赖关系可通过位运算符实现:
task1 >> [task2, task3] # task1先于task2和task3执行
task3 << task4 # task4先于task3执行
实战案例:电商数据处理[编辑 | 编辑源代码]
场景描述[编辑 | 编辑源代码]
每天处理订单数据: 1. 从HDFS提取数据 2. 用Hive清洗数据 3. 用Spark计算指标 4. 将结果写入MySQL
实现代码[编辑 | 编辑源代码]
from airflow.providers.apache.hive.operators.hive import HiveOperator
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
with DAG("ecommerce_etl", schedule_interval="@daily") as dag:
extract = BashOperator(
task_id="extract_from_hdfs",
bash_command="hdfs dfs -get /raw/orders_{{ ds }}.json /tmp/"
)
transform = HiveOperator(
task_id="clean_data",
hql="LOAD DATA LOCAL INPATH '/tmp/orders_{{ ds }}.json' INTO TABLE orders"
)
analyze = SparkSubmitOperator(
task_id="calculate_metrics",
application="/jobs/sales_analysis.py",
application_args=["{{ ds }}"]
)
extract >> transform >> analyze
监控与调优[编辑 | 编辑源代码]
Web界面[编辑 | 编辑源代码]
访问http://localhost:8080
可查看:
- DAG运行状态
- 任务持续时间统计
- 日志详情
性能调优[编辑 | 编辑源代码]
- 使用
executor_config
分配资源:
PythonOperator(
task_id="heavy_task",
python_callable=process_data,
executor_config={"KubernetesExecutor": {"request_memory": "512Mi"}}
)
- 通过
pool
控制并行任务数
数学基础[编辑 | 编辑源代码]
Airflow调度器使用贪心算法分配任务,其时间复杂度为: 其中:
- = 待调度任务数
- = 依赖关系边数
常见问题[编辑 | 编辑源代码]
问题 | 解决方案 |
---|---|
DAG未显示 | 检查airflow.cfg 中dags_folder 路径
|
任务卡住 | 检查airflow scheduler 是否运行
|
依赖冲突 | 使用pip check 验证包兼容性
|
总结[编辑 | 编辑源代码]
Airflow通过代码定义工作流的方式,完美解决了Hadoop生态中复杂任务调度的问题。其核心优势包括:
- 可视化监控:内置Web UI
- 可扩展性:支持自定义Operator
- 可重复性:任务失败自动重试
建议初学者从官方示例DAG开始,逐步构建真实场景的管道。