跳转到内容
主菜单
主菜单
移至侧栏
隐藏
导航
首页
最近更改
随机页面
MediaWiki帮助
代码酷
搜索
搜索
中文(中国大陆)
外观
创建账号
登录
个人工具
创建账号
登录
未登录编辑者的页面
了解详情
贡献
讨论
编辑“︁
Airflow REST API使用
”︁
页面
讨论
大陆简体
阅读
编辑
编辑源代码
查看历史
工具
工具
移至侧栏
隐藏
操作
阅读
编辑
编辑源代码
查看历史
常规
链入页面
相关更改
特殊页面
页面信息
外观
移至侧栏
隐藏
您的更改会在有权核准的用户核准后向读者展示。
警告:
您没有登录。如果您进行任何编辑,您的IP地址会公开展示。如果您
登录
或
创建账号
,您的编辑会以您的用户名署名,此外还有其他益处。
反垃圾检查。
不要
加入这个!
= Airflow REST API使用 = == 介绍 == Apache Airflow 的 REST API 提供了一种以编程方式与 Airflow 交互的方法,允许用户执行任务如触发 DAG 运行、监控任务状态、管理变量等。REST API 是自动化工作流和集成 Airflow 到其他系统的关键工具,尤其适用于 CI/CD 流水线或外部监控系统。 Airflow 2.0 引入了基于 Flask AppBuilder 的稳定 REST API,取代了旧版的实验性 API。以下内容涵盖 API 的核心功能、认证方式、实际用例及代码示例。 == 基础配置 == === 启用 REST API === 默认情况下,Airflow 的 REST API 已启用。确保 `airflow.cfg` 中以下配置正确: <syntaxhighlight lang="ini"> [api] auth_backend = airflow.api.auth.backend.basic_auth </syntaxhighlight> === 认证方式 === Airflow 支持多种认证方式,包括: 1. **Basic Auth**:用户名和密码。 2. **JWT**(推荐):通过令牌认证。 3. **OAuth**:与企业身份提供商集成。 生成 JWT 令牌的示例: <syntaxhighlight lang="bash"> curl -X POST \ http://localhost:8080/api/v1/security/login \ -H 'Content-Type: application/json' \ -d '{"username":"admin", "password":"admin"}' </syntaxhighlight> 响应示例: <syntaxhighlight lang="json"> { "access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...", "refresh_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..." } </syntaxhighlight> == 核心 API 端点 == 以下是常用 API 端点及功能: {| class="wikitable" |+ **Airflow REST API 核心端点** ! 端点 !! 方法 !! 描述 |- | <code>/api/v1/dags</code> || GET || 列出所有 DAG |- | <code>/api/v1/dags/{dag_id}/dagRuns</code> || POST || 触发 DAG 运行 |- | <code>/api/v1/dags/{dag_id}/tasks/{task_id}</code> || GET || 获取任务状态 |- | <code>/api/v1/variables</code> || POST || 创建或更新变量 |} == 代码示例 == === 触发 DAG 运行 === <syntaxhighlight lang="python"> import requests url = "http://localhost:8080/api/v1/dags/example_dag/dagRuns" headers = { "Authorization": "Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...", "Content-Type": "application/json" } data = { "conf": {"key": "value"} # 可选配置参数 } response = requests.post(url, headers=headers, json=data) print(response.json()) </syntaxhighlight> 输出示例: <syntaxhighlight lang="json"> { "dag_run_id": "manual__2023-10-01T12:00:00+00:00", "dag_id": "example_dag", "execution_date": "2023-10-01T12:00:00+00:00" } </syntaxhighlight> === 监控任务状态 === <syntaxhighlight lang="python"> url = "http://localhost:8080/api/v1/dags/example_dag/tasks/task_1" response = requests.get(url, headers=headers) print(response.json()) </syntaxhighlight> 输出示例: <syntaxhighlight lang="json"> { "task_id": "task_1", "state": "success", "start_date": "2023-10-01T12:00:00+00:00", "end_date": "2023-10-01T12:01:00+00:00" } </syntaxhighlight> == 实际案例 == === 场景:CI/CD 流水线集成 === 在 CI 阶段完成后,通过 REST API 触发 Airflow DAG 部署新版本: <mermaid> graph LR A[CI Pipeline] -->|POST /dagRuns| B[Airflow] B --> C[Deploy DAG] C --> D[Notify Slack] </mermaid> === 场景:外部监控系统 === 外部系统定期检查 Airflow 任务状态,失败时发送告警: <syntaxhighlight lang="python"> def check_task_status(): response = requests.get(task_status_url, headers=headers) if response.json()["state"] == "failed": send_alert("Task failed!") </syntaxhighlight> == 高级功能 == === 批量操作 === 使用 <code>/api/v1/dags/~/dagRuns</code> 批量触发多个 DAG。 === 动态配置传递 === 通过 <code>conf</code> 参数传递动态参数至 DAG: <syntaxhighlight lang="json"> { "conf": {"environment": "prod", "version": "1.0.0"} } </syntaxhighlight> == 故障排查 == * **401 Unauthorized**:检查令牌是否过期或无效。 * **404 Not Found**:确认 DAG 或任务 ID 存在。 * **500 Server Error**:查看 Airflow 日志获取详细错误。 == 总结 == Airflow REST API 提供了强大的自动化能力,适合从简单任务触发到复杂系统集成的场景。通过结合认证、核心端点和实际案例,用户可以高效地将其融入现有工作流。 [[Category:大数据框架]] [[Category:Airflow]] [[Category:Airflow调度与触发]]
摘要:
请注意,所有对代码酷的贡献均被视为依照知识共享署名-非商业性使用-相同方式共享发表(详情请见
代码酷:著作权
)。如果您不希望您的文字作品被随意编辑和分发传播,请不要在此提交。
您同时也向我们承诺,您提交的内容为您自己所创作,或是复制自公共领域或类似自由来源。
未经许可,请勿提交受著作权保护的作品!
取消
编辑帮助
(在新窗口中打开)