Airflow HttpSensor
外观
Airflow HttpSensor[编辑 | 编辑源代码]
介绍[编辑 | 编辑源代码]
HttpSensor 是 Apache Airflow 中的一个核心传感器(Sensor),用于监控外部 HTTP 端点是否满足特定条件(如返回特定状态码或响应内容)。它属于 Airflow 的传感器类(Sensor),用于在 DAG 执行期间等待某些外部条件成立后再继续执行后续任务。
HttpSensor 的主要功能包括:
- 定期向目标 URL 发送 HTTP 请求(GET 或 POST)。
- 检查响应是否符合预期(如状态码是否为 200,或响应内容是否包含特定字符串)。
- 支持自定义请求头、请求体、超时设置和重试机制。
基本用法[编辑 | 编辑源代码]
以下是一个简单的 HttpSensor 示例,用于监控某个 API 是否可用:
from airflow import DAG
from airflow.sensors.http_sensor import HttpSensor
from datetime import datetime
with DAG(
dag_id="http_sensor_example",
start_date=datetime(2023, 1, 1),
schedule_interval="@daily",
) as dag:
check_api_availability = HttpSensor(
task_id="check_api_availability",
http_conn_id="my_http_connection", # 定义在 Airflow Connections 中的 HTTP 连接
endpoint="/api/health", # API 端点路径
request_params={}, # 可选的查询参数
response_check=lambda response: response.status_code == 200,
poke_interval=60, # 每 60 秒检查一次
timeout=300, # 超时时间(秒)
)
参数说明[编辑 | 编辑源代码]
- http_conn_id:在 Airflow 的 Connections 中配置的 HTTP 连接信息(如基础 URL)。
- endpoint:要监控的 API 端点路径(会附加到基础 URL 后)。
- response_check:一个可调用对象,用于验证响应是否符合预期(如状态码或内容检查)。
- poke_interval:两次检查之间的间隔时间(秒)。
- timeout:传感器等待的最大时间(秒),超时后任务标记为失败。
高级配置[编辑 | 编辑源代码]
HttpSensor 支持更复杂的 HTTP 请求配置,例如自定义请求头、请求体和认证方式。
自定义请求头和请求体[编辑 | 编辑源代码]
check_api_with_headers = HttpSensor(
task_id="check_api_with_headers",
http_conn_id="my_http_connection",
endpoint="/api/data",
method="POST", # 使用 POST 方法
headers={"Content-Type": "application/json", "Authorization": "Bearer token123"},
data='{"query": "status"}', # 请求体内容
response_check=lambda response: "ready" in response.text,
)
使用响应内容检查[编辑 | 编辑源代码]
可以通过 `response_check` 函数进一步检查响应内容:
def check_response(response):
# 检查状态码是否为 200 并且响应中包含 "success" 字段
return (
response.status_code == 200
and response.json().get("status") == "success"
)
check_api_content = HttpSensor(
task_id="check_api_content",
http_conn_id="my_http_connection",
endpoint="/api/status",
response_check=check_response,
)
实际案例[编辑 | 编辑源代码]
以下是一个真实场景示例,监控天气 API 是否返回有效数据后再执行后续任务:
from airflow import DAG
from airflow.sensors.http_sensor import HttpSensor
from airflow.operators.python import PythonOperator
from datetime import datetime
import requests
def fetch_weather_data(**kwargs):
# 实际获取天气数据的逻辑
response = requests.get("https://api.weather.com/data")
return response.json()
with DAG(
dag_id="weather_data_pipeline",
start_date=datetime(2023, 1, 1),
schedule_interval="@hourly",
) as dag:
check_weather_api = HttpSensor(
task_id="check_weather_api",
http_conn_id="weather_api_connection",
endpoint="/data",
response_check=lambda response: response.json().get("available") is True,
poke_interval=30,
)
fetch_data = PythonOperator(
task_id="fetch_weather_data",
python_callable=fetch_weather_data,
)
check_weather_api >> fetch_data
流程图[编辑 | 编辑源代码]
常见问题[编辑 | 编辑源代码]
1. 如何处理 HTTPS 和 SSL 验证?[编辑 | 编辑源代码]
可以通过设置 `extra_options` 参数禁用 SSL 验证(仅限测试环境):
HttpSensor(
task_id="insecure_request",
http_conn_id="my_http_connection",
endpoint="/api",
extra_options={"verify": False}, # 禁用 SSL 验证
)
2. 如何设置更复杂的重试逻辑?[编辑 | 编辑源代码]
可以通过 `retries` 和 `retry_delay` 参数配置重试行为:
HttpSensor(
task_id="retry_example",
http_conn_id="my_http_connection",
endpoint="/api",
retries=3, # 最大重试次数
retry_delay=timedelta(minutes=5), # 每次重试间隔
)
总结[编辑 | 编辑源代码]
HttpSensor 是 Airflow 中用于监控 HTTP 服务的强大工具,适用于以下场景:
- 等待外部 API 就绪后再继续执行任务。
- 检查 Web 服务是否返回预期响应。
- 实现工作流中的外部依赖检查。
通过合理配置 `response_check` 和重试参数,可以构建健壮的监控逻辑,确保工作流仅在条件满足时继续执行。