跳转到内容

Airflow工作流管理

来自代码酷

Airflow工作流管理[编辑 | 编辑源代码]

Apache Airflow 是一个开源的工作流自动化和管理平台,用于编程式地编排、调度和监控复杂的数据管道。它由Airbnb开发并于2016年开源,现已成为Hadoop生态中任务调度的核心工具之一。Airflow使用Python编写,通过有向无环图(DAG)定义任务依赖关系,支持动态生成管道、任务重试、日志追踪等功能。

核心概念[编辑 | 编辑源代码]

1. DAG(有向无环图)[编辑 | 编辑源代码]

Airflow的核心抽象是DAG,表示一组具有明确依赖关系的任务。例如:

graph LR A[数据提取] --> B[数据清洗] B --> C[数据分析] C --> D[报告生成]

2. Operator[编辑 | 编辑源代码]

定义单个任务的执行逻辑,常用类型包括:

  • BashOperator:执行Shell命令
  • PythonOperator:调用Python函数
  • HiveOperator:运行Hive查询

3. Task & Task Instance[编辑 | 编辑源代码]

  • Task:DAG中的一个节点
  • Task Instance:任务的一次具体运行(含执行时间等上下文)

安装与基础示例[编辑 | 编辑源代码]

安装[编辑 | 编辑源代码]

通过pip安装:

pip install apache-airflow
airflow standalone  # 快速启动本地实例

第一个DAG示例[编辑 | 编辑源代码]

创建~/airflow/dags/hello_world.py

from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime

with DAG(
    dag_id="hello_world",
    start_date=datetime(2023, 1, 1),
    schedule_interval="@daily"
) as dag:
    task1 = BashOperator(
        task_id="print_date",
        bash_command="date"
    )
    task2 = BashOperator(
        task_id="echo",
        bash_command="echo 'Hello Airflow!'"
    )
    task1 >> task2  # 定义依赖关系

输出示例(通过airflow tasks list hello_world查看):

print_date
echo

高级特性[编辑 | 编辑源代码]

动态DAG生成[编辑 | 编辑源代码]

使用Python代码动态创建DAG:

for i in range(3):
    dag_id = f'dynamic_dag_{i}'
    with DAG(dag_id, schedule_interval='@hourly') as dag:
        BashOperator(
            task_id=f'task_{i}',
            bash_command=f'echo "Processing batch {i}"'
        )

任务依赖控制[编辑 | 编辑源代码]

复杂依赖关系可通过位运算符实现:

task1 >> [task2, task3]  # task1先于task2和task3执行
task3 << task4  # task4先于task3执行

实战案例:电商数据处理[编辑 | 编辑源代码]

场景描述[编辑 | 编辑源代码]

每天处理订单数据: 1. 从HDFS提取数据 2. 用Hive清洗数据 3. 用Spark计算指标 4. 将结果写入MySQL

实现代码[编辑 | 编辑源代码]

from airflow.providers.apache.hive.operators.hive import HiveOperator
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator

with DAG("ecommerce_etl", schedule_interval="@daily") as dag:
    extract = BashOperator(
        task_id="extract_from_hdfs",
        bash_command="hdfs dfs -get /raw/orders_{{ ds }}.json /tmp/"
    )
    
    transform = HiveOperator(
        task_id="clean_data",
        hql="LOAD DATA LOCAL INPATH '/tmp/orders_{{ ds }}.json' INTO TABLE orders"
    )
    
    analyze = SparkSubmitOperator(
        task_id="calculate_metrics",
        application="/jobs/sales_analysis.py",
        application_args=["{{ ds }}"]
    )
    
    extract >> transform >> analyze

监控与调优[编辑 | 编辑源代码]

Web界面[编辑 | 编辑源代码]

访问http://localhost:8080可查看:

  • DAG运行状态
  • 任务持续时间统计
  • 日志详情

性能调优[编辑 | 编辑源代码]

  • 使用executor_config分配资源:
PythonOperator(
    task_id="heavy_task",
    python_callable=process_data,
    executor_config={"KubernetesExecutor": {"request_memory": "512Mi"}}
)
  • 通过pool控制并行任务数

数学基础[编辑 | 编辑源代码]

Airflow调度器使用贪心算法分配任务,其时间复杂度为: O(n+m) 其中:

  • n = 待调度任务数
  • m = 依赖关系边数

常见问题[编辑 | 编辑源代码]

问题 解决方案
DAG未显示 检查airflow.cfgdags_folder路径
任务卡住 检查airflow scheduler是否运行
依赖冲突 使用pip check验证包兼容性

总结[编辑 | 编辑源代码]

Airflow通过代码定义工作流的方式,完美解决了Hadoop生态中复杂任务调度的问题。其核心优势包括:

  • 可视化监控:内置Web UI
  • 可扩展性:支持自定义Operator
  • 可重复性:任务失败自动重试

建议初学者从官方示例DAG开始,逐步构建真实场景的管道。