Airflow REST API使用
外观
Airflow REST API使用[编辑 | 编辑源代码]
介绍[编辑 | 编辑源代码]
Apache Airflow 的 REST API 提供了一种以编程方式与 Airflow 交互的方法,允许用户执行任务如触发 DAG 运行、监控任务状态、管理变量等。REST API 是自动化工作流和集成 Airflow 到其他系统的关键工具,尤其适用于 CI/CD 流水线或外部监控系统。
Airflow 2.0 引入了基于 Flask AppBuilder 的稳定 REST API,取代了旧版的实验性 API。以下内容涵盖 API 的核心功能、认证方式、实际用例及代码示例。
基础配置[编辑 | 编辑源代码]
启用 REST API[编辑 | 编辑源代码]
默认情况下,Airflow 的 REST API 已启用。确保 `airflow.cfg` 中以下配置正确:
[api]
auth_backend = airflow.api.auth.backend.basic_auth
认证方式[编辑 | 编辑源代码]
Airflow 支持多种认证方式,包括: 1. **Basic Auth**:用户名和密码。 2. **JWT**(推荐):通过令牌认证。 3. **OAuth**:与企业身份提供商集成。
生成 JWT 令牌的示例:
curl -X POST \
http://localhost:8080/api/v1/security/login \
-H 'Content-Type: application/json' \
-d '{"username":"admin", "password":"admin"}'
响应示例:
{
"access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",
"refresh_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..."
}
核心 API 端点[编辑 | 编辑源代码]
以下是常用 API 端点及功能:
端点 | 方法 | 描述 |
---|---|---|
/api/v1/dags |
GET | 列出所有 DAG |
/api/v1/dags/{dag_id}/dagRuns |
POST | 触发 DAG 运行 |
/api/v1/dags/{dag_id}/tasks/{task_id} |
GET | 获取任务状态 |
/api/v1/variables |
POST | 创建或更新变量 |
代码示例[编辑 | 编辑源代码]
触发 DAG 运行[编辑 | 编辑源代码]
import requests
url = "http://localhost:8080/api/v1/dags/example_dag/dagRuns"
headers = {
"Authorization": "Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",
"Content-Type": "application/json"
}
data = {
"conf": {"key": "value"} # 可选配置参数
}
response = requests.post(url, headers=headers, json=data)
print(response.json())
输出示例:
{
"dag_run_id": "manual__2023-10-01T12:00:00+00:00",
"dag_id": "example_dag",
"execution_date": "2023-10-01T12:00:00+00:00"
}
监控任务状态[编辑 | 编辑源代码]
url = "http://localhost:8080/api/v1/dags/example_dag/tasks/task_1"
response = requests.get(url, headers=headers)
print(response.json())
输出示例:
{
"task_id": "task_1",
"state": "success",
"start_date": "2023-10-01T12:00:00+00:00",
"end_date": "2023-10-01T12:01:00+00:00"
}
实际案例[编辑 | 编辑源代码]
场景:CI/CD 流水线集成[编辑 | 编辑源代码]
在 CI 阶段完成后,通过 REST API 触发 Airflow DAG 部署新版本:
场景:外部监控系统[编辑 | 编辑源代码]
外部系统定期检查 Airflow 任务状态,失败时发送告警:
def check_task_status():
response = requests.get(task_status_url, headers=headers)
if response.json()["state"] == "failed":
send_alert("Task failed!")
高级功能[编辑 | 编辑源代码]
批量操作[编辑 | 编辑源代码]
使用 /api/v1/dags/~/dagRuns
批量触发多个 DAG。
动态配置传递[编辑 | 编辑源代码]
通过 conf
参数传递动态参数至 DAG:
{
"conf": {"environment": "prod", "version": "1.0.0"}
}
故障排查[编辑 | 编辑源代码]
- **401 Unauthorized**:检查令牌是否过期或无效。
- **404 Not Found**:确认 DAG 或任务 ID 存在。
- **500 Server Error**:查看 Airflow 日志获取详细错误。
总结[编辑 | 编辑源代码]
Airflow REST API 提供了强大的自动化能力,适合从简单任务触发到复杂系统集成的场景。通过结合认证、核心端点和实际案例,用户可以高效地将其融入现有工作流。