Airflow API触发
Airflow API触发[编辑 | 编辑源代码]
介绍[编辑 | 编辑源代码]
Airflow API触发是指通过Airflow的REST API接口手动触发DAG(有向无环图)运行的技术。与依赖调度器自动触发不同,API触发提供了灵活的外部控制能力,适用于以下场景:
- 外部系统(如CI/CD管道)需要按需启动工作流
- 人工干预测试或紧急任务执行
- 与其他服务集成实现事件驱动架构
Airflow 2.0+ 提供了稳定的REST API(基于Flask AppBuilder),取代了旧版的实验性API。
API端点概览[编辑 | 编辑源代码]
关键端点(需认证):
HTTP方法 | 端点 | 功能 |
---|---|---|
POST | /api/v1/dags/{dag_id}/dagRuns |
触发指定DAG的新运行 |
GET | /api/v1/dags/{dag_id}/dagRuns |
列出DAG的所有运行记录 |
认证方式[编辑 | 编辑源代码]
Airflow API支持多种认证:
- Basic Auth(用户名/密码)
- JWT(JSON Web Token)
- OAuth(企业版)
配置示例(airflow.cfg
):
[api]
auth_backend = airflow.api.auth.backend.basic_auth
触发DAG运行[编辑 | 编辑源代码]
基本请求示例[编辑 | 编辑源代码]
使用curl
触发DAG:
curl -X POST \
"http://localhost:8080/api/v1/dags/example_dag/dagRuns" \
-H "Content-Type: application/json" \
-H "Authorization: Basic $(echo -n 'admin:admin' | base64)" \
-d '{"conf": {"key": "value"}}'
参数说明:
conf
: 传递给DAG的配置字典(可选)logical_date
: 指定逻辑日期(ISO 8601格式,可选)
Python客户端示例[编辑 | 编辑源代码]
使用官方Python客户端:
from airflow.api.client.local_client import Client
client = Client(api_base_url='http://localhost:8080')
response = client.trigger_dag(
dag_id='example_dag',
run_id='manual__2023-01-01T00:00:00',
conf={'param': 'test'}
)
print(response)
输出示例:
{
"conf": {"param": "test"},
"dag_id": "example_dag",
"dag_run_id": "manual__2023-01-01T00:00:00",
"end_date": null,
"logical_date": "2023-01-01T00:00:00+00:00",
"state": "running"
}
状态检查[编辑 | 编辑源代码]
通过API检查DAG运行状态:
curl -X GET \
"http://localhost:8080/api/v1/dags/example_dag/dagRuns" \
-H "Authorization: Basic ..."
响应包含state
字段,可能值:
queued
running
success
failed
实际案例[编辑 | 编辑源代码]
场景:CI/CD管道集成[编辑 | 编辑源代码]
当GitHub代码库收到推送时,通过Webhook触发Airflow测试DAG:
代码实现[编辑 | 编辑源代码]
Webhook处理器(Python Flask示例):
@app.route('/webhook', methods=['POST'])
def handle_webhook():
if request.headers.get('X-GitHub-Event') == 'push':
response = requests.post(
'http://airflow-server/api/v1/dags/test_pipeline/dagRuns',
headers={'Authorization': 'Bearer YOUR_TOKEN'},
json={"conf": {"commit_id": request.json['after']}}
)
return response.json(), response.status_code
高级配置[编辑 | 编辑源代码]
自定义触发逻辑[编辑 | 编辑源代码]
在DAG文件中添加触发条件检查:
from airflow.decorators import dag
@dag(schedule_interval=None)
def api_triggered_dag():
@task
def check_trigger_params(conf):
if 'required_key' not in conf:
raise ValueError("Missing required parameter")
check_trigger_params(conf="{{ dag_run.conf }}")
安全加固[编辑 | 编辑源代码]
1. 启用HTTPS 2. 限制IP访问(Web服务器配置) 3. 使用短期有效的JWT令牌
常见问题[编辑 | 编辑源代码]
Q: 触发请求返回403错误 A: 检查: 1. 用户是否具有"DAG Edit"权限 2. 认证凭据是否过期
Q: DAG未按预期执行
A: 检查:
1. is_paused_upon_creation
参数
2. 调度器是否正常运行
数学表示[编辑 | 编辑源代码]
API触发可视为函数映射:
解析失败 (语法错误): {\displaystyle \text{API\_Trigger}: (DAG\_ID \times Config) \rightarrow \text{DAG\_Run\_State} }
其中:
- 解析失败 (语法错误): {\displaystyle \text{DAG\_Run\_State} \in \{\text{queued}, \text{running}, \text{success}, \text{failed}\}}
总结[编辑 | 编辑源代码]
Airflow API触发提供了强大的外部集成能力,适合需要动态控制工作流的场景。通过合理设计认证机制和参数传递,可以实现安全的自动化触发流程。