跳转到内容

Airflow API触发

来自代码酷

Airflow API触发[编辑 | 编辑源代码]

介绍[编辑 | 编辑源代码]

Airflow API触发是指通过Airflow的REST API接口手动触发DAG(有向无环图)运行的技术。与依赖调度器自动触发不同,API触发提供了灵活的外部控制能力,适用于以下场景:

  • 外部系统(如CI/CD管道)需要按需启动工作流
  • 人工干预测试或紧急任务执行
  • 与其他服务集成实现事件驱动架构

Airflow 2.0+ 提供了稳定的REST API(基于Flask AppBuilder),取代了旧版的实验性API。

API端点概览[编辑 | 编辑源代码]

关键端点(需认证):

HTTP方法 端点 功能
POST /api/v1/dags/{dag_id}/dagRuns 触发指定DAG的新运行
GET /api/v1/dags/{dag_id}/dagRuns 列出DAG的所有运行记录

认证方式[编辑 | 编辑源代码]

Airflow API支持多种认证:

  • Basic Auth(用户名/密码)
  • JWT(JSON Web Token)
  • OAuth(企业版)

配置示例(airflow.cfg):

  
[api]  
auth_backend = airflow.api.auth.backend.basic_auth

触发DAG运行[编辑 | 编辑源代码]

基本请求示例[编辑 | 编辑源代码]

使用curl触发DAG:

  
curl -X POST \  
  "http://localhost:8080/api/v1/dags/example_dag/dagRuns" \  
  -H "Content-Type: application/json" \  
  -H "Authorization: Basic $(echo -n 'admin:admin' | base64)" \  
  -d '{"conf": {"key": "value"}}'

参数说明

  • conf: 传递给DAG的配置字典(可选)
  • logical_date: 指定逻辑日期(ISO 8601格式,可选)

Python客户端示例[编辑 | 编辑源代码]

使用官方Python客户端:

  
from airflow.api.client.local_client import Client  

client = Client(api_base_url='http://localhost:8080')  
response = client.trigger_dag(  
    dag_id='example_dag',  
    run_id='manual__2023-01-01T00:00:00',  
    conf={'param': 'test'}  
)  
print(response)

输出示例

  
{  
  "conf": {"param": "test"},  
  "dag_id": "example_dag",  
  "dag_run_id": "manual__2023-01-01T00:00:00",  
  "end_date": null,  
  "logical_date": "2023-01-01T00:00:00+00:00",  
  "state": "running"  
}

状态检查[编辑 | 编辑源代码]

通过API检查DAG运行状态:

  
curl -X GET \  
  "http://localhost:8080/api/v1/dags/example_dag/dagRuns" \  
  -H "Authorization: Basic ..."

响应包含state字段,可能值:

  • queued
  • running
  • success
  • failed

实际案例[编辑 | 编辑源代码]

场景:CI/CD管道集成[编辑 | 编辑源代码]

当GitHub代码库收到推送时,通过Webhook触发Airflow测试DAG:

sequenceDiagram participant GitHub participant WebhookServer participant AirflowAPI GitHub->>WebhookServer: POST push event WebhookServer->>AirflowAPI: POST /api/v1/dags/test_pipeline/dagRuns AirflowAPI-->>WebhookServer: 202 Accepted WebhookServer-->>GitHub: 200 OK

代码实现[编辑 | 编辑源代码]

Webhook处理器(Python Flask示例):

  
@app.route('/webhook', methods=['POST'])  
def handle_webhook():  
    if request.headers.get('X-GitHub-Event') == 'push':  
        response = requests.post(  
            'http://airflow-server/api/v1/dags/test_pipeline/dagRuns',  
            headers={'Authorization': 'Bearer YOUR_TOKEN'},  
            json={"conf": {"commit_id": request.json['after']}}  
        )  
        return response.json(), response.status_code

高级配置[编辑 | 编辑源代码]

自定义触发逻辑[编辑 | 编辑源代码]

在DAG文件中添加触发条件检查:

  
from airflow.decorators import dag  

@dag(schedule_interval=None)  
def api_triggered_dag():  
    @task  
    def check_trigger_params(conf):  
        if 'required_key' not in conf:  
            raise ValueError("Missing required parameter")  

    check_trigger_params(conf="{{ dag_run.conf }}")

安全加固[编辑 | 编辑源代码]

1. 启用HTTPS 2. 限制IP访问(Web服务器配置) 3. 使用短期有效的JWT令牌

常见问题[编辑 | 编辑源代码]

Q: 触发请求返回403错误 A: 检查: 1. 用户是否具有"DAG Edit"权限 2. 认证凭据是否过期

Q: DAG未按预期执行 A: 检查: 1. is_paused_upon_creation参数 2. 调度器是否正常运行

数学表示[编辑 | 编辑源代码]

API触发可视为函数映射:

解析失败 (语法错误): {\displaystyle \text{API\_Trigger}: (DAG\_ID \times Config) \rightarrow \text{DAG\_Run\_State} }

其中:

  • ConfigKey-Value Pairs
  • 解析失败 (语法错误): {\displaystyle \text{DAG\_Run\_State} \in \{\text{queued}, \text{running}, \text{success}, \text{failed}\}}

总结[编辑 | 编辑源代码]

Airflow API触发提供了强大的外部集成能力,适合需要动态控制工作流的场景。通过合理设计认证机制和参数传递,可以实现安全的自动化触发流程。