跳转到内容

Airflow外部触发

来自代码酷

Airflow外部触发[编辑 | 编辑源代码]

Airflow外部触发是指通过Airflow API、命令行工具或Web界面等方式从外部系统手动触发DAG运行的过程。与定时调度不同,外部触发允许用户根据特定事件或条件灵活启动工作流,适用于需要即时响应或条件驱动的场景。

核心概念[编辑 | 编辑源代码]

触发方式对比[编辑 | 编辑源代码]

触发类型 描述 典型用例
基于预设的cron表达式或时间间隔自动触发 | 每日报表生成
通过API/CLI/UI手动触发,可传递参数 | 数据到达事件响应
等待外部条件满足后触发 | 文件到达检测

触发参数传递[编辑 | 编辑源代码]

外部触发时可通过conf参数传递JSON格式的配置,例如:

# 触发时传递参数示例
airflow dags trigger --conf '{"key":"value"}' my_dag

实现方法[编辑 | 编辑源代码]

1. 通过CLI触发[编辑 | 编辑源代码]

使用airflow dags trigger命令:

# 基本触发
airflow dags trigger -e "2024-01-01" example_dag

# 带参数触发
airflow dags trigger --conf '{"dataset":"sales_Q1"}' sales_analysis

2. 通过REST API触发[编辑 | 编辑源代码]

Airflow 2.0+ 提供稳定API端点:

import requests

response = requests.post(
    "http://localhost:8080/api/v1/dags/sample_dag/dagRuns",
    headers={"Authorization": "Bearer YOUR_TOKEN"},
    json={"conf": {"param1": "value1"}}
)

3. 通过Python代码触发[编辑 | 编辑源代码]

使用Airflow的DagBagDagRun

from airflow.models import DagBag
dagbag = DagBag()
dag = dagbag.get_dag('example_dag')
dag.create_dagrun(
    run_id='manual__2024-01-01T00:00:00',
    execution_date=datetime(2024,1,1),
    conf={'key': 'value'},
    state='running'
)

实际应用案例[编辑 | 编辑源代码]

数据管道触发场景[编辑 | 编辑源代码]

当数据湖中出现新文件时,通过Lambda函数触发DAG:

sequenceDiagram participant S3 as S3 Bucket participant Lambda as AWS Lambda participant Airflow S3->>Lambda: 文件上传事件 Lambda->>Airflow: POST /api/v1/dags/ingest/dagRuns Airflow->>Airflow: 启动数据摄取DAG

CI/CD集成示例[编辑 | 编辑源代码]

在部署完成后触发测试DAG:

# GitHub Actions 片段
- name: Trigger Airflow Tests
  run: |
    curl -X POST \
      -H "Authorization: Bearer ${{ secrets.AIRFLOW_API_KEY }}" \
      -H "Content-Type: application/json" \
      -d '{"conf":{"commit_hash":"${{ github.sha }}"}}' \
      http://airflow.example.com/api/v1/dags/run_tests/dagRuns

高级配置[编辑 | 编辑源代码]

触发权限控制[编辑 | 编辑源代码]

airflow.cfg中配置API权限:

[api]
auth_backend = airflow.api.auth.backend.basic_auth

自定义触发器[编辑 | 编辑源代码]

创建PythonOperator作为触发端点:

from airflow.decorators import dag, task
from datetime import datetime

@dag(start_date=datetime(2023,1,1), schedule=None)
def webhook_trigger():
    @task
    def process_webhook(conf=None):
        print(f"Received payload: {conf}")

webhook_dag = webhook_trigger()

最佳实践[编辑 | 编辑源代码]

  • 为外部触发设置独立的DAG文件
  • 验证传入参数的完整性
  • 记录触发来源和上下文
  • 限制高频触发权限
  • 使用execution_timeout防止长时间运行

常见问题[编辑 | 编辑源代码]

Q: 外部触发会跳过之前的未执行任务吗?
A: 不会,每个DagRun都是独立实例,除非设置catchup=True

Q: 如何追踪触发来源?
A: 通过conf传入source字段或在日志中检查triggered_by信息

Q: 最大并发触发数如何控制?
A: 通过DAG的max_active_runs参数限制

数学表达[编辑 | 编辑源代码]

外部触发的执行时间公式: Texec=Ttrigger+Δqueue 其中:

  • Ttrigger = 触发时间
  • Δqueue = 调度器队列延迟

参见[编辑 | 编辑源代码]

  • Airflow官方文档中的TriggerDagRunOperator
  • REST API参考指南中的dagRuns端点
  • 安全配置中的认证和授权章节