跳转到内容

Airflow HttpSensor

来自代码酷

Airflow HttpSensor[编辑 | 编辑源代码]

介绍[编辑 | 编辑源代码]

HttpSensor 是 Apache Airflow 中的一个核心传感器(Sensor),用于监控外部 HTTP 端点是否满足特定条件(如返回特定状态码或响应内容)。它属于 Airflow 的传感器类(Sensor),用于在 DAG 执行期间等待某些外部条件成立后再继续执行后续任务。

HttpSensor 的主要功能包括:

  • 定期向目标 URL 发送 HTTP 请求(GET 或 POST)。
  • 检查响应是否符合预期(如状态码是否为 200,或响应内容是否包含特定字符串)。
  • 支持自定义请求头、请求体、超时设置和重试机制。

基本用法[编辑 | 编辑源代码]

以下是一个简单的 HttpSensor 示例,用于监控某个 API 是否可用:

from airflow import DAG
from airflow.sensors.http_sensor import HttpSensor
from datetime import datetime

with DAG(
    dag_id="http_sensor_example",
    start_date=datetime(2023, 1, 1),
    schedule_interval="@daily",
) as dag:
    check_api_availability = HttpSensor(
        task_id="check_api_availability",
        http_conn_id="my_http_connection",  # 定义在 Airflow Connections 中的 HTTP 连接
        endpoint="/api/health",             # API 端点路径
        request_params={},                  # 可选的查询参数
        response_check=lambda response: response.status_code == 200,
        poke_interval=60,                   # 每 60 秒检查一次
        timeout=300,                        # 超时时间(秒)
    )

参数说明[编辑 | 编辑源代码]

  • http_conn_id:在 Airflow 的 Connections 中配置的 HTTP 连接信息(如基础 URL)。
  • endpoint:要监控的 API 端点路径(会附加到基础 URL 后)。
  • response_check:一个可调用对象,用于验证响应是否符合预期(如状态码或内容检查)。
  • poke_interval:两次检查之间的间隔时间(秒)。
  • timeout:传感器等待的最大时间(秒),超时后任务标记为失败。

高级配置[编辑 | 编辑源代码]

HttpSensor 支持更复杂的 HTTP 请求配置,例如自定义请求头、请求体和认证方式。

自定义请求头和请求体[编辑 | 编辑源代码]

check_api_with_headers = HttpSensor(
    task_id="check_api_with_headers",
    http_conn_id="my_http_connection",
    endpoint="/api/data",
    method="POST",                          # 使用 POST 方法
    headers={"Content-Type": "application/json", "Authorization": "Bearer token123"},
    data='{"query": "status"}',            # 请求体内容
    response_check=lambda response: "ready" in response.text,
)

使用响应内容检查[编辑 | 编辑源代码]

可以通过 `response_check` 函数进一步检查响应内容:

def check_response(response):
    # 检查状态码是否为 200 并且响应中包含 "success" 字段
    return (
        response.status_code == 200
        and response.json().get("status") == "success"
    )

check_api_content = HttpSensor(
    task_id="check_api_content",
    http_conn_id="my_http_connection",
    endpoint="/api/status",
    response_check=check_response,
)

实际案例[编辑 | 编辑源代码]

以下是一个真实场景示例,监控天气 API 是否返回有效数据后再执行后续任务:

from airflow import DAG
from airflow.sensors.http_sensor import HttpSensor
from airflow.operators.python import PythonOperator
from datetime import datetime
import requests

def fetch_weather_data(**kwargs):
    # 实际获取天气数据的逻辑
    response = requests.get("https://api.weather.com/data")
    return response.json()

with DAG(
    dag_id="weather_data_pipeline",
    start_date=datetime(2023, 1, 1),
    schedule_interval="@hourly",
) as dag:
    check_weather_api = HttpSensor(
        task_id="check_weather_api",
        http_conn_id="weather_api_connection",
        endpoint="/data",
        response_check=lambda response: response.json().get("available") is True,
        poke_interval=30,
    )

    fetch_data = PythonOperator(
        task_id="fetch_weather_data",
        python_callable=fetch_weather_data,
    )

    check_weather_api >> fetch_data

流程图[编辑 | 编辑源代码]

graph LR A[开始] --> B[HttpSensor: 检查天气API] B --> C{API可用?} C -->|是| D[PythonOperator: 获取数据] C -->|否| B

常见问题[编辑 | 编辑源代码]

1. 如何处理 HTTPS 和 SSL 验证?[编辑 | 编辑源代码]

可以通过设置 `extra_options` 参数禁用 SSL 验证(仅限测试环境):

HttpSensor(
    task_id="insecure_request",
    http_conn_id="my_http_connection",
    endpoint="/api",
    extra_options={"verify": False},  # 禁用 SSL 验证
)

2. 如何设置更复杂的重试逻辑?[编辑 | 编辑源代码]

可以通过 `retries` 和 `retry_delay` 参数配置重试行为:

HttpSensor(
    task_id="retry_example",
    http_conn_id="my_http_connection",
    endpoint="/api",
    retries=3,                      # 最大重试次数
    retry_delay=timedelta(minutes=5), # 每次重试间隔
)

总结[编辑 | 编辑源代码]

HttpSensor 是 Airflow 中用于监控 HTTP 服务的强大工具,适用于以下场景:

  • 等待外部 API 就绪后再继续执行任务。
  • 检查 Web 服务是否返回预期响应。
  • 实现工作流中的外部依赖检查。

通过合理配置 `response_check` 和重试参数,可以构建健壮的监控逻辑,确保工作流仅在条件满足时继续执行。