跳转到内容

Airflow REST API使用

来自代码酷

Airflow REST API使用[编辑 | 编辑源代码]

介绍[编辑 | 编辑源代码]

Apache Airflow 的 REST API 提供了一种以编程方式与 Airflow 交互的方法,允许用户执行任务如触发 DAG 运行、监控任务状态、管理变量等。REST API 是自动化工作流和集成 Airflow 到其他系统的关键工具,尤其适用于 CI/CD 流水线或外部监控系统。

Airflow 2.0 引入了基于 Flask AppBuilder 的稳定 REST API,取代了旧版的实验性 API。以下内容涵盖 API 的核心功能、认证方式、实际用例及代码示例。

基础配置[编辑 | 编辑源代码]

启用 REST API[编辑 | 编辑源代码]

默认情况下,Airflow 的 REST API 已启用。确保 `airflow.cfg` 中以下配置正确:

  
[api]  
auth_backend = airflow.api.auth.backend.basic_auth

认证方式[编辑 | 编辑源代码]

Airflow 支持多种认证方式,包括: 1. **Basic Auth**:用户名和密码。 2. **JWT**(推荐):通过令牌认证。 3. **OAuth**:与企业身份提供商集成。

生成 JWT 令牌的示例:

  
curl -X POST \  
  http://localhost:8080/api/v1/security/login \  
  -H 'Content-Type: application/json' \  
  -d '{"username":"admin", "password":"admin"}'

响应示例:

  
{  
  "access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",  
  "refresh_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..."  
}

核心 API 端点[编辑 | 编辑源代码]

以下是常用 API 端点及功能:

**Airflow REST API 核心端点**
端点 方法 描述
/api/v1/dags GET 列出所有 DAG
/api/v1/dags/{dag_id}/dagRuns POST 触发 DAG 运行
/api/v1/dags/{dag_id}/tasks/{task_id} GET 获取任务状态
/api/v1/variables POST 创建或更新变量

代码示例[编辑 | 编辑源代码]

触发 DAG 运行[编辑 | 编辑源代码]

  
import requests  

url = "http://localhost:8080/api/v1/dags/example_dag/dagRuns"  
headers = {  
    "Authorization": "Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",  
    "Content-Type": "application/json"  
}  
data = {  
    "conf": {"key": "value"}  # 可选配置参数  
}  

response = requests.post(url, headers=headers, json=data)  
print(response.json())

输出示例:

  
{  
  "dag_run_id": "manual__2023-10-01T12:00:00+00:00",  
  "dag_id": "example_dag",  
  "execution_date": "2023-10-01T12:00:00+00:00"  
}

监控任务状态[编辑 | 编辑源代码]

  
url = "http://localhost:8080/api/v1/dags/example_dag/tasks/task_1"  
response = requests.get(url, headers=headers)  
print(response.json())

输出示例:

  
{  
  "task_id": "task_1",  
  "state": "success",  
  "start_date": "2023-10-01T12:00:00+00:00",  
  "end_date": "2023-10-01T12:01:00+00:00"  
}

实际案例[编辑 | 编辑源代码]

场景:CI/CD 流水线集成[编辑 | 编辑源代码]

在 CI 阶段完成后,通过 REST API 触发 Airflow DAG 部署新版本:

graph LR A[CI Pipeline] -->|POST /dagRuns| B[Airflow] B --> C[Deploy DAG] C --> D[Notify Slack]

场景:外部监控系统[编辑 | 编辑源代码]

外部系统定期检查 Airflow 任务状态,失败时发送告警:

  
def check_task_status():  
    response = requests.get(task_status_url, headers=headers)  
    if response.json()["state"] == "failed":  
        send_alert("Task failed!")

高级功能[编辑 | 编辑源代码]

批量操作[编辑 | 编辑源代码]

使用 /api/v1/dags/~/dagRuns 批量触发多个 DAG。

动态配置传递[编辑 | 编辑源代码]

通过 conf 参数传递动态参数至 DAG:

  
{  
  "conf": {"environment": "prod", "version": "1.0.0"}  
}

故障排查[编辑 | 编辑源代码]

  • **401 Unauthorized**:检查令牌是否过期或无效。
  • **404 Not Found**:确认 DAG 或任务 ID 存在。
  • **500 Server Error**:查看 Airflow 日志获取详细错误。

总结[编辑 | 编辑源代码]

Airflow REST API 提供了强大的自动化能力,适合从简单任务触发到复杂系统集成的场景。通过结合认证、核心端点和实际案例,用户可以高效地将其融入现有工作流。