Airflow外部触发
外观
Airflow外部触发[编辑 | 编辑源代码]
Airflow外部触发是指通过Airflow API、命令行工具或Web界面等方式从外部系统手动触发DAG运行的过程。与定时调度不同,外部触发允许用户根据特定事件或条件灵活启动工作流,适用于需要即时响应或条件驱动的场景。
核心概念[编辑 | 编辑源代码]
触发方式对比[编辑 | 编辑源代码]
触发类型 | 描述 | 典型用例 |
---|---|---|
基于预设的cron表达式或时间间隔自动触发 | 每日报表生成 | ||
通过API/CLI/UI手动触发,可传递参数 | 数据到达事件响应 | ||
等待外部条件满足后触发 | 文件到达检测 |
触发参数传递[编辑 | 编辑源代码]
外部触发时可通过conf
参数传递JSON格式的配置,例如:
# 触发时传递参数示例
airflow dags trigger --conf '{"key":"value"}' my_dag
实现方法[编辑 | 编辑源代码]
1. 通过CLI触发[编辑 | 编辑源代码]
使用airflow dags trigger
命令:
# 基本触发
airflow dags trigger -e "2024-01-01" example_dag
# 带参数触发
airflow dags trigger --conf '{"dataset":"sales_Q1"}' sales_analysis
2. 通过REST API触发[编辑 | 编辑源代码]
Airflow 2.0+ 提供稳定API端点:
import requests
response = requests.post(
"http://localhost:8080/api/v1/dags/sample_dag/dagRuns",
headers={"Authorization": "Bearer YOUR_TOKEN"},
json={"conf": {"param1": "value1"}}
)
3. 通过Python代码触发[编辑 | 编辑源代码]
使用Airflow的DagBag
和DagRun
:
from airflow.models import DagBag
dagbag = DagBag()
dag = dagbag.get_dag('example_dag')
dag.create_dagrun(
run_id='manual__2024-01-01T00:00:00',
execution_date=datetime(2024,1,1),
conf={'key': 'value'},
state='running'
)
实际应用案例[编辑 | 编辑源代码]
数据管道触发场景[编辑 | 编辑源代码]
当数据湖中出现新文件时,通过Lambda函数触发DAG:
CI/CD集成示例[编辑 | 编辑源代码]
在部署完成后触发测试DAG:
# GitHub Actions 片段
- name: Trigger Airflow Tests
run: |
curl -X POST \
-H "Authorization: Bearer ${{ secrets.AIRFLOW_API_KEY }}" \
-H "Content-Type: application/json" \
-d '{"conf":{"commit_hash":"${{ github.sha }}"}}' \
http://airflow.example.com/api/v1/dags/run_tests/dagRuns
高级配置[编辑 | 编辑源代码]
触发权限控制[编辑 | 编辑源代码]
在airflow.cfg
中配置API权限:
[api]
auth_backend = airflow.api.auth.backend.basic_auth
自定义触发器[编辑 | 编辑源代码]
创建PythonOperator作为触发端点:
from airflow.decorators import dag, task
from datetime import datetime
@dag(start_date=datetime(2023,1,1), schedule=None)
def webhook_trigger():
@task
def process_webhook(conf=None):
print(f"Received payload: {conf}")
webhook_dag = webhook_trigger()
最佳实践[编辑 | 编辑源代码]
- 为外部触发设置独立的DAG文件
- 验证传入参数的完整性
- 记录触发来源和上下文
- 限制高频触发权限
- 使用
execution_timeout
防止长时间运行
常见问题[编辑 | 编辑源代码]
Q: 外部触发会跳过之前的未执行任务吗?
A: 不会,每个DagRun
都是独立实例,除非设置catchup=True
Q: 如何追踪触发来源?
A: 通过conf
传入source
字段或在日志中检查triggered_by
信息
Q: 最大并发触发数如何控制?
A: 通过DAG的max_active_runs
参数限制
数学表达[编辑 | 编辑源代码]
外部触发的执行时间公式: 其中:
- = 触发时间
- = 调度器队列延迟
参见[编辑 | 编辑源代码]
- Airflow官方文档中的TriggerDagRunOperator
- REST API参考指南中的dagRuns端点
- 安全配置中的认证和授权章节