Airflow架构概述
外观
Airflow架构概述[编辑 | 编辑源代码]
Apache Airflow是一个用于编排复杂计算工作流和数据处理管道的平台。其核心设计原则是“以代码定义工作流”,通过有向无环图(DAG)描述任务依赖关系。本章将深入解析Airflow的核心架构组件及其协作方式。
核心组件[编辑 | 编辑源代码]
Airflow架构由以下关键模块组成:
1. 元数据库(Metadata Database)[编辑 | 编辑源代码]
存储DAG定义、任务状态、变量、连接等元数据。默认使用SQLite(仅限开发环境),生产环境推荐PostgreSQL/MySQL。
2. 调度器(Scheduler)[编辑 | 编辑源代码]
负责:
- 解析DAG文件
- 检查任务依赖关系
- 触发满足条件的任务
- 将任务实例发送到执行队列
3. 执行器(Executor)[编辑 | 编辑源代码]
决定任务如何运行,常见类型:
- LocalExecutor:本地进程池
- CeleryExecutor:分布式任务队列
- KubernetesExecutor:Kubernetes Pod
4. 工作节点(Worker)[编辑 | 编辑源代码]
实际执行任务的进程(Celery/K8s模式下为独立节点)
5. Web服务器[编辑 | 编辑源代码]
提供UI界面,展示DAG状态、日志、任务历史等
数据流示例[编辑 | 编辑源代码]
# 示例:简单DAG定义
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def print_hello():
return "Hello World!"
with DAG(
dag_id="hello_world",
start_date=datetime(2023, 1, 1),
schedule="@daily"
) as dag:
task = PythonOperator(
task_id="print_hello",
python_callable=print_hello
)
# 输出说明:
# 1. 该DAG会被调度器扫描并存入元数据库
# 2. 满足时间条件后生成DAG Run和Task Instance
# 3. 执行器分配Worker运行任务
调度机制[编辑 | 编辑源代码]
Airflow使用时间窗口调度模型:
- 调度间隔(schedule_interval)决定运行频率
- 执行日期(execution_date)标记数据批次
- 计算公式:
开始时间 | 间隔 | 实际执行时间 | 说明 |
---|---|---|---|
2023-01-01 00:00 | @daily | 2023-01-02 00:00 | 处理2023-01-01的数据 |
2023-01-01 00:00 | */2 hours | 2023-01-01 02:00 | 每两小时执行 |
实际应用场景[编辑 | 编辑源代码]
电商数据分析管道: 1. 每日0点触发订单数据抽取(PythonOperator) 2. 数据清洗完成后启动用户行为分析(SparkOperator) 3. 最后生成可视化报表并邮件通知(EmailOperator)
高级特性[编辑 | 编辑源代码]
- Backfilling:重新处理历史数据
- XComs:任务间小数据传递
- Hooks:外部系统连接抽象
- Pools:限制并发资源
# XCom使用示例
def push_data(**context):
context['ti'].xcom_push(key='sample', value=42)
def pull_data(**context):
value = context['ti'].xcom_pull(key='sample')
print(f"Received: {value}")
push_task = PythonOperator(task_id='push', python_callable=push_data)
pull_task = PythonOperator(task_id='pull', python_callable=pull_data)
常见问题[编辑 | 编辑源代码]
Q: 为什么任务状态没有更新? A: 检查调度器是否运行,Worker是否存活,元数据库连接是否正常
Q: 如何提高调度精度? A: 调整调度器参数:
- scheduler_heartbeat_sec
- dag_dir_list_interval
性能优化建议[编辑 | 编辑源代码]
- 使用DAG序列化加速调度器解析
- 为高频任务设置优先级权重
- 避免在DAG文件中进行重型初始化