跳转到内容

Airflow架构概述

来自代码酷

Airflow架构概述[编辑 | 编辑源代码]

Apache Airflow是一个用于编排复杂计算工作流和数据处理管道的平台。其核心设计原则是“以代码定义工作流”,通过有向无环图(DAG)描述任务依赖关系。本章将深入解析Airflow的核心架构组件及其协作方式。

核心组件[编辑 | 编辑源代码]

Airflow架构由以下关键模块组成:

1. 元数据库(Metadata Database)[编辑 | 编辑源代码]

存储DAG定义、任务状态、变量、连接等元数据。默认使用SQLite(仅限开发环境),生产环境推荐PostgreSQL/MySQL。

2. 调度器(Scheduler)[编辑 | 编辑源代码]

负责:

  • 解析DAG文件
  • 检查任务依赖关系
  • 触发满足条件的任务
  • 将任务实例发送到执行队列

3. 执行器(Executor)[编辑 | 编辑源代码]

决定任务如何运行,常见类型:

  • LocalExecutor:本地进程池
  • CeleryExecutor:分布式任务队列
  • KubernetesExecutor:Kubernetes Pod

4. 工作节点(Worker)[编辑 | 编辑源代码]

实际执行任务的进程(Celery/K8s模式下为独立节点)

5. Web服务器[编辑 | 编辑源代码]

提供UI界面,展示DAG状态、日志、任务历史等

graph TD A[用户编写DAG文件] --> B[元数据库] B --> C[调度器] C --> D[执行器] D --> E[工作节点] E --> F[元数据库] C --> G[Web服务器] G --> B

数据流示例[编辑 | 编辑源代码]

# 示例:简单DAG定义
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def print_hello():
    return "Hello World!"

with DAG(
    dag_id="hello_world",
    start_date=datetime(2023, 1, 1),
    schedule="@daily"
) as dag:
    
    task = PythonOperator(
        task_id="print_hello",
        python_callable=print_hello
    )

# 输出说明:
# 1. 该DAG会被调度器扫描并存入元数据库
# 2. 满足时间条件后生成DAG Run和Task Instance
# 3. 执行器分配Worker运行任务

调度机制[编辑 | 编辑源代码]

Airflow使用时间窗口调度模型:

  • 调度间隔(schedule_interval)决定运行频率
  • 执行日期(execution_date)标记数据批次
  • 计算公式:execution_date=start_date+schedule_interval
调度示例
开始时间 间隔 实际执行时间 说明
2023-01-01 00:00 @daily 2023-01-02 00:00 处理2023-01-01的数据
2023-01-01 00:00 */2 hours 2023-01-01 02:00 每两小时执行

实际应用场景[编辑 | 编辑源代码]

电商数据分析管道: 1. 每日0点触发订单数据抽取(PythonOperator) 2. 数据清洗完成后启动用户行为分析(SparkOperator) 3. 最后生成可视化报表并邮件通知(EmailOperator)

graph LR A[订单数据抽取] --> B[数据清洗] B --> C[用户行为分析] C --> D[生成报表] D --> E[发送邮件]

高级特性[编辑 | 编辑源代码]

  • Backfilling:重新处理历史数据
  • XComs:任务间小数据传递
  • Hooks:外部系统连接抽象
  • Pools:限制并发资源
# XCom使用示例
def push_data(**context):
    context['ti'].xcom_push(key='sample', value=42)

def pull_data(**context):
    value = context['ti'].xcom_pull(key='sample')
    print(f"Received: {value}")

push_task = PythonOperator(task_id='push', python_callable=push_data)
pull_task = PythonOperator(task_id='pull', python_callable=pull_data)

常见问题[编辑 | 编辑源代码]

Q: 为什么任务状态没有更新? A: 检查调度器是否运行,Worker是否存活,元数据库连接是否正常

Q: 如何提高调度精度? A: 调整调度器参数:

  • scheduler_heartbeat_sec
  • dag_dir_list_interval

性能优化建议[编辑 | 编辑源代码]

  • 使用DAG序列化加速调度器解析
  • 为高频任务设置优先级权重
  • 避免在DAG文件中进行重型初始化