Airflow指标收集
外观
Airflow指标收集[编辑 | 编辑源代码]
Airflow指标收集是指通过监控工具或内置机制采集Apache Airflow运行时的性能数据、任务状态和系统资源使用情况的过程。这些指标对于优化工作流、调试故障和保障系统稳定性至关重要。
概述[编辑 | 编辑源代码]
Airflow支持通过多种方式收集指标,包括:
- 内置的StatsD集成
- Prometheus导出器
- 自定义日志处理器
- 数据库查询
指标通常分为以下几类:
- 任务指标:任务执行时长、重试次数、成功率等
- 调度器指标:DAG解析时间、任务排队数量
- 执行器指标:资源使用率(CPU/内存)、工作线程状态
- 系统指标:数据库连接数、消息队列深度
配置指标收集[编辑 | 编辑源代码]
通过StatsD[编辑 | 编辑源代码]
Airflow原生支持StatsD协议,需在`airflow.cfg`中配置:
[metrics]
statsd_on = True
statsd_host = localhost
statsd_port = 8125
statsd_prefix = airflow
示例Python代码发送自定义指标:
from airflow.stats import Stats
Stats.incr('custom_metric.count') # 计数器递增
Stats.timing('task.duration', 1500) # 记录耗时(毫秒)
Prometheus集成[编辑 | 编辑源代码]
使用`airflow-exporter`将指标转换为Prometheus格式:
pip install apache-airflow[prometheus]
启动后暴露`/metrics`端点,输出示例:
# TYPE airflow_task_fail_count counter
airflow_task_fail_total{dag_id="example_dag", task_id="load_data"} 3
关键指标详解[编辑 | 编辑源代码]
指标名称 | 类型 | 描述 |
---|---|---|
Counter | DAG文件解析总次数 | ||
Counter | 任务实例失败总数 | ||
Gauge | 调度器存活状态(1=活跃) | ||
Gauge | 执行器可用任务槽位 |
可视化案例[编辑 | 编辑源代码]
使用Grafana展示指标(需先配置数据源):
数学建模[编辑 | 编辑源代码]
任务成功率公式: 解析失败 (语法错误): {\displaystyle \text{Success Rate} = \frac{\sum \text{successful\_tasks}}{\sum (\text{successful\_tasks} + \text{failed\_tasks})} \times 100\% }
实际应用场景[编辑 | 编辑源代码]
电商订单处理管道监控: 1. 设置警报规则:当`order_processing.delay`指标超过5秒时触发 2. 跟踪`payment_validation_failures`的日增长率 3. 通过`worker_cpu_usage`动态调整Celery并发数
高级技巧[编辑 | 编辑源代码]
- 使用自定义指标装饰器追踪关键函数:
from airflow.decorators import apply_defaults
def track_metrics(func):
@wraps(func)
def wrapper(*args, **kwargs):
start = time.time()
result = func(*args, **kwargs)
Stats.timing(f"{func.__name__}.duration", time.time()-start)
return result
return wrapper
@track_metrics
def process_data(file_path):
# 数据处理逻辑
故障排查指南[编辑 | 编辑源代码]
若指标未显示,检查: 1. 网络连通性(防火墙是否开放StatsD端口) 2. Airflow日志中的`ERROR`级别消息 3. 指标前缀冲突(多个环境共用同一StatsD服务器时)
延伸阅读[编辑 | 编辑源代码]
- Airflow官方文档的[Monitoring]章节
- Prometheus的[Metric Types]规范
- Grafana的[Dashboard Templating]功能