Airflow命令行工具
外观
Airflow命令行工具[编辑 | 编辑源代码]
Airflow命令行工具是Apache Airflow提供的核心功能之一,允许用户通过终端直接与Airflow进行交互,执行任务调度、DAG管理、任务触发等操作。它是初学者和高级用户管理Airflow工作流的重要工具。
介绍[编辑 | 编辑源代码]
Airflow的命令行接口(CLI)基于Python的`argparse`库构建,提供了丰富的子命令来操作Airflow的各个组件。通过CLI,用户可以:
- 启动/停止Airflow服务(如Web服务器、调度器)
- 手动触发或停止DAG运行
- 检查任务状态和日志
- 测试单个任务
- 管理数据库连接和变量
基础命令[编辑 | 编辑源代码]
以下是Airflow CLI的核心命令分类及示例:
服务管理[编辑 | 编辑源代码]
# 启动Web服务器(默认端口8080)
airflow webserver --port 8080
# 启动调度器
airflow scheduler
DAG操作[编辑 | 编辑源代码]
# 列出所有DAGs
airflow dags list
# 手动触发DAG运行
airflow dags trigger --exec-date "2023-01-01" example_dag
# 暂停/取消暂停DAG
airflow dags pause example_dag
airflow dags unpause example_dag
任务操作[编辑 | 编辑源代码]
# 测试特定任务
airflow tasks test example_dag extract_data 2023-01-01
# 查看任务实例状态
airflow tasks states-for-dag-run example_dag 2023-01-01T00:00:00+00:00
高级用法[编辑 | 编辑源代码]
变量管理[编辑 | 编辑源代码]
# 设置变量
airflow variables set my_key "my_value"
# 导出所有变量到JSON文件
airflow variables export variables.json
连接管理[编辑 | 编辑源代码]
# 添加数据库连接
airflow connections add \
--conn-type postgres \
--conn-host localhost \
--conn-login user \
--conn-password password \
my_postgres_conn
实际案例[编辑 | 编辑源代码]
场景:调试失败的任务[编辑 | 编辑源代码]
1. 检查失败的任务ID:
airflow tasks list example_dag --tree
2. 查看具体日志:
airflow tasks show example_dag extract_data 2023-01-01
3. 本地测试该任务:
airflow tasks test example_dag extract_data 2023-01-01
场景:批量操作[编辑 | 编辑源代码]
使用xargs批量重试失败的任务:
airflow tasks list example_dag --state failed | xargs -I {} airflow tasks retry example_dag {} 2023-01-01
命令结构图[编辑 | 编辑源代码]
常见问题[编辑 | 编辑源代码]
Q: 如何查看所有可用命令?
airflow --help
Q: 命令执行无响应? 检查Airflow是否已正确初始化数据库:
airflow db init
最佳实践[编辑 | 编辑源代码]
- 在生产环境中,建议使用`--daemon`参数后台运行服务:
airflow webserver --daemon
- 对于复杂操作,可以结合Python API使用:
from airflow.api.client.local_client import Client
client = Client(None, None)
client.trigger_dag(dag_id='example_dag', run_id='manual_001')
数学表达[编辑 | 编辑源代码]
Airflow使用指数退避算法进行任务重试,延迟时间计算: 其中是配置的基础延迟时间,是当前重试次数。
总结[编辑 | 编辑源代码]
Airflow命令行工具提供了全面控制工作流的能力,从简单的服务管理到复杂的调试操作。掌握CLI可以显著提高Airflow的使用效率,特别是在开发和故障排除阶段。建议用户定期查阅官方文档以获取最新命令和参数。