跳转到内容

Airflow命令行工具

来自代码酷

Airflow命令行工具[编辑 | 编辑源代码]

Airflow命令行工具是Apache Airflow提供的核心功能之一,允许用户通过终端直接与Airflow进行交互,执行任务调度、DAG管理、任务触发等操作。它是初学者和高级用户管理Airflow工作流的重要工具。

介绍[编辑 | 编辑源代码]

Airflow的命令行接口(CLI)基于Python的`argparse`库构建,提供了丰富的子命令来操作Airflow的各个组件。通过CLI,用户可以:

  • 启动/停止Airflow服务(如Web服务器、调度器)
  • 手动触发或停止DAG运行
  • 检查任务状态和日志
  • 测试单个任务
  • 管理数据库连接和变量

基础命令[编辑 | 编辑源代码]

以下是Airflow CLI的核心命令分类及示例:

服务管理[编辑 | 编辑源代码]

# 启动Web服务器(默认端口8080)
airflow webserver --port 8080

# 启动调度器
airflow scheduler

DAG操作[编辑 | 编辑源代码]

# 列出所有DAGs
airflow dags list

# 手动触发DAG运行
airflow dags trigger --exec-date "2023-01-01" example_dag

# 暂停/取消暂停DAG
airflow dags pause example_dag
airflow dags unpause example_dag

任务操作[编辑 | 编辑源代码]

# 测试特定任务
airflow tasks test example_dag extract_data 2023-01-01

# 查看任务实例状态
airflow tasks states-for-dag-run example_dag 2023-01-01T00:00:00+00:00

高级用法[编辑 | 编辑源代码]

变量管理[编辑 | 编辑源代码]

# 设置变量
airflow variables set my_key "my_value"

# 导出所有变量到JSON文件
airflow variables export variables.json

连接管理[编辑 | 编辑源代码]

# 添加数据库连接
airflow connections add \
    --conn-type postgres \
    --conn-host localhost \
    --conn-login user \
    --conn-password password \
    my_postgres_conn

实际案例[编辑 | 编辑源代码]

场景:调试失败的任务[编辑 | 编辑源代码]

1. 检查失败的任务ID:

airflow tasks list example_dag --tree

2. 查看具体日志:

airflow tasks show example_dag extract_data 2023-01-01

3. 本地测试该任务:

airflow tasks test example_dag extract_data 2023-01-01

场景:批量操作[编辑 | 编辑源代码]

使用xargs批量重试失败的任务:

airflow tasks list example_dag --state failed | xargs -I {} airflow tasks retry example_dag {} 2023-01-01

命令结构图[编辑 | 编辑源代码]

graph TD A[airflow] --> B[webserver] A --> C[scheduler] A --> D[dags] A --> E[tasks] A --> F[variables] A --> G[connections] D --> D1[list] D --> D2[trigger] D --> D3[pause] E --> E1[test] E --> E2[states-for-dag-run]

常见问题[编辑 | 编辑源代码]

Q: 如何查看所有可用命令?

airflow --help

Q: 命令执行无响应? 检查Airflow是否已正确初始化数据库:

airflow db init

最佳实践[编辑 | 编辑源代码]

  • 在生产环境中,建议使用`--daemon`参数后台运行服务:
airflow webserver --daemon
  • 对于复杂操作,可以结合Python API使用:
from airflow.api.client.local_client import Client
client = Client(None, None)
client.trigger_dag(dag_id='example_dag', run_id='manual_001')

数学表达[编辑 | 编辑源代码]

Airflow使用指数退避算法进行任务重试,延迟时间计算: delay=base_delay×2(attempt_number1) 其中base_delay是配置的基础延迟时间,attempt_number是当前重试次数。

总结[编辑 | 编辑源代码]

Airflow命令行工具提供了全面控制工作流的能力,从简单的服务管理到复杂的调试操作。掌握CLI可以显著提高Airflow的使用效率,特别是在开发和故障排除阶段。建议用户定期查阅官方文档以获取最新命令和参数。