跳转到内容

Airflow手动触发

来自代码酷

Airflow手动触发[编辑 | 编辑源代码]

介绍[编辑 | 编辑源代码]

Airflow手动触发是指通过用户主动操作(而非依赖调度器自动执行)来启动DAG(有向无环图)运行的过程。在Apache Airflow中,DAG通常按预设的时间表自动触发,但在调试、紧急任务执行或特殊场景下,手动触发功能尤为重要。

主要应用场景包括:

  • 开发/测试阶段验证DAG逻辑
  • 生产环境中处理突发数据需求
  • 跳过正常调度周期执行紧急任务
  • 重新运行失败的任务实例

触发方式[编辑 | 编辑源代码]

Airflow提供三种主要手动触发方式:

1. Web UI触发[编辑 | 编辑源代码]

通过Airflow的Web界面操作: 1. 导航到DAGs列表页面 2. 点击目标DAG右侧的Trigger Dag按钮 3. 在弹出窗口中配置参数(可选) 4. 确认触发

2. CLI命令触发[编辑 | 编辑源代码]

使用airflow dags trigger命令:

# 基本语法
airflow dags trigger -e <execution_date> <dag_id>

# 实际示例(触发2023-01-01运行的example_dag)
airflow dags trigger -e "2023-01-01T00:00:00" example_dag

3. API调用触发[编辑 | 编辑源代码]

通过REST API端点/api/v1/dags/{dag_id}/dagRuns

import requests
from datetime import datetime

url = "http://localhost:8080/api/v1/dags/example_dag/dagRuns"
headers = {
    "Content-Type": "application/json",
    "Authorization": "Bearer YOUR_TOKEN"
}
payload = {
    "execution_date": datetime.utcnow().isoformat() + "Z",
    "conf": {"key": "value"}  # 可选配置
}

response = requests.post(url, json=payload, headers=headers)
print(response.json())

配置参数[编辑 | 编辑源代码]

手动触发时可指定以下参数:

触发参数说明
参数 类型 描述 示例
execution_date datetime 逻辑执行时间 2023-01-01T00:00:00
conf JSON 传递给DAG的配置 {"environment": "test"}
run_id string 自定义运行标识 manual_run_001

实际案例[编辑 | 编辑源代码]

场景:紧急数据补录[编辑 | 编辑源代码]

某电商平台因系统故障丢失2023-06-18的订单数据,需要手动触发ETL流程:

# 补录DAG示例(dag_id: orders_etl)
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def process_orders(**context):
    execution_date = context["execution_date"]
    print(f"Processing orders for {execution_date}")

dag = DAG(
    'orders_etl',
    schedule_interval='@daily',
    start_date=datetime(2023, 1, 1)
    
task = PythonOperator(
    task_id='process_orders',
    python_callable=process_orders,
    dag=dag)

手动触发命令

airflow dags trigger -e "2023-06-18T00:00:00" orders_etl \
--conf '{"priority": "high", "source": "backup"}'

执行流程[编辑 | 编辑源代码]

sequenceDiagram participant User participant WebUI/CLI/API participant Scheduler participant Executor User->>WebUI/CLI/API: 触发请求 WebUI/CLI/API->>Scheduler: 创建DagRun Scheduler->>Executor: 分配任务 Executor->>Scheduler: 执行状态更新 Scheduler->>WebUI/CLI/API: 返回结果 WebUI/CLI/API->>User: 显示运行状态

注意事项[编辑 | 编辑源代码]

  • 执行日期逻辑:手动触发的execution_date应与DAG的start_dateschedule_interval兼容
  • 并发控制:检查max_active_runs参数避免超限
  • 依赖关系:手动触发会跳过depends_on_past检查(可通过-i参数禁用此行为)
  • 任务状态:手动运行与自动调度的运行在系统中同等对待

高级配置[编辑 | 编辑源代码]

使用TriggerDagRunOperator[编辑 | 编辑源代码]

实现DAG之间的手动触发:

from airflow.operators.trigger_dagrun import TriggerDagRunOperator

trigger_task = TriggerDagRunOperator(
    task_id='trigger_target_dag',
    trigger_dag_id="target_dag",
    execution_date="{{ execution_date }}",
    conf={"message": "Triggered from parent DAG"},
    dag=dag
)

自定义触发逻辑[编辑 | 编辑源代码]

通过DAG文件的Python代码实现条件触发:

def _check_trigger_condition(**context):
    # 添加自定义触发逻辑
    return some_condition

with DAG(...) as dag:
    condition = PythonOperator(
        task_id='check_condition',
        python_callable=_check_trigger_condition
    )
    
    trigger = TriggerDagRunOperator(
        task_id='conditional_trigger',
        trigger_dag_id="target_dag",
        python_callable=_check_trigger_condition
    )
    
    condition >> trigger

常见问题[编辑 | 编辑源代码]

Q:手动触发与回填(backfill)有何区别? A:手动触发创建单个DAG运行,而回填会为指定时间范围内的每个调度间隔创建运行实例。

Q:如何避免手动触发影响生产环境? A:建议: 1. 在测试环境验证DAG 2. 使用--dry-run参数模拟运行 3. 设置is_paused_upon_creation=True创建暂停的DAG

Q:手动触发运行会出现在哪个时间轴上? A:在Airflow UI的Graph/Tree视图中,手动触发运行会出现在其execution_date对应的时间位置。