Airflow手动触发
外观
Airflow手动触发[编辑 | 编辑源代码]
介绍[编辑 | 编辑源代码]
Airflow手动触发是指通过用户主动操作(而非依赖调度器自动执行)来启动DAG(有向无环图)运行的过程。在Apache Airflow中,DAG通常按预设的时间表自动触发,但在调试、紧急任务执行或特殊场景下,手动触发功能尤为重要。
主要应用场景包括:
- 开发/测试阶段验证DAG逻辑
- 生产环境中处理突发数据需求
- 跳过正常调度周期执行紧急任务
- 重新运行失败的任务实例
触发方式[编辑 | 编辑源代码]
Airflow提供三种主要手动触发方式:
1. Web UI触发[编辑 | 编辑源代码]
通过Airflow的Web界面操作: 1. 导航到DAGs列表页面 2. 点击目标DAG右侧的Trigger Dag按钮 3. 在弹出窗口中配置参数(可选) 4. 确认触发
2. CLI命令触发[编辑 | 编辑源代码]
使用airflow dags trigger
命令:
# 基本语法
airflow dags trigger -e <execution_date> <dag_id>
# 实际示例(触发2023-01-01运行的example_dag)
airflow dags trigger -e "2023-01-01T00:00:00" example_dag
3. API调用触发[编辑 | 编辑源代码]
通过REST API端点/api/v1/dags/{dag_id}/dagRuns
:
import requests
from datetime import datetime
url = "http://localhost:8080/api/v1/dags/example_dag/dagRuns"
headers = {
"Content-Type": "application/json",
"Authorization": "Bearer YOUR_TOKEN"
}
payload = {
"execution_date": datetime.utcnow().isoformat() + "Z",
"conf": {"key": "value"} # 可选配置
}
response = requests.post(url, json=payload, headers=headers)
print(response.json())
配置参数[编辑 | 编辑源代码]
手动触发时可指定以下参数:
参数 | 类型 | 描述 | 示例 |
---|---|---|---|
execution_date |
datetime | 逻辑执行时间 | 2023-01-01T00:00:00 |
conf |
JSON | 传递给DAG的配置 | {"environment": "test"}
|
run_id |
string | 自定义运行标识 | manual_run_001 |
实际案例[编辑 | 编辑源代码]
场景:紧急数据补录[编辑 | 编辑源代码]
某电商平台因系统故障丢失2023-06-18的订单数据,需要手动触发ETL流程:
# 补录DAG示例(dag_id: orders_etl)
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def process_orders(**context):
execution_date = context["execution_date"]
print(f"Processing orders for {execution_date}")
dag = DAG(
'orders_etl',
schedule_interval='@daily',
start_date=datetime(2023, 1, 1)
task = PythonOperator(
task_id='process_orders',
python_callable=process_orders,
dag=dag)
手动触发命令:
airflow dags trigger -e "2023-06-18T00:00:00" orders_etl \
--conf '{"priority": "high", "source": "backup"}'
执行流程[编辑 | 编辑源代码]
注意事项[编辑 | 编辑源代码]
- 执行日期逻辑:手动触发的
execution_date
应与DAG的start_date
和schedule_interval
兼容 - 并发控制:检查
max_active_runs
参数避免超限 - 依赖关系:手动触发会跳过
depends_on_past
检查(可通过-i
参数禁用此行为) - 任务状态:手动运行与自动调度的运行在系统中同等对待
高级配置[编辑 | 编辑源代码]
使用TriggerDagRunOperator[编辑 | 编辑源代码]
实现DAG之间的手动触发:
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
trigger_task = TriggerDagRunOperator(
task_id='trigger_target_dag',
trigger_dag_id="target_dag",
execution_date="{{ execution_date }}",
conf={"message": "Triggered from parent DAG"},
dag=dag
)
自定义触发逻辑[编辑 | 编辑源代码]
通过DAG文件的Python代码实现条件触发:
def _check_trigger_condition(**context):
# 添加自定义触发逻辑
return some_condition
with DAG(...) as dag:
condition = PythonOperator(
task_id='check_condition',
python_callable=_check_trigger_condition
)
trigger = TriggerDagRunOperator(
task_id='conditional_trigger',
trigger_dag_id="target_dag",
python_callable=_check_trigger_condition
)
condition >> trigger
常见问题[编辑 | 编辑源代码]
Q:手动触发与回填(backfill)有何区别? A:手动触发创建单个DAG运行,而回填会为指定时间范围内的每个调度间隔创建运行实例。
Q:如何避免手动触发影响生产环境?
A:建议:
1. 在测试环境验证DAG
2. 使用--dry-run
参数模拟运行
3. 设置is_paused_upon_creation=True
创建暂停的DAG
Q:手动触发运行会出现在哪个时间轴上?
A:在Airflow UI的Graph/Tree视图中,手动触发运行会出现在其execution_date
对应的时间位置。