跳转到内容
主菜单
主菜单
移至侧栏
隐藏
导航
首页
最近更改
随机页面
MediaWiki帮助
代码酷
搜索
搜索
中文(中国大陆)
外观
创建账号
登录
个人工具
创建账号
登录
未登录编辑者的页面
了解详情
贡献
讨论
编辑“︁
Airflow HttpSensor
”︁(章节)
页面
讨论
大陆简体
阅读
编辑
编辑源代码
查看历史
工具
工具
移至侧栏
隐藏
操作
阅读
编辑
编辑源代码
查看历史
常规
链入页面
相关更改
特殊页面
页面信息
外观
移至侧栏
隐藏
您的更改会在有权核准的用户核准后向读者展示。
警告:
您没有登录。如果您进行任何编辑,您的IP地址会公开展示。如果您
登录
或
创建账号
,您的编辑会以您的用户名署名,此外还有其他益处。
反垃圾检查。
不要
加入这个!
= Airflow HttpSensor = == 介绍 == '''HttpSensor''' 是 Apache Airflow 中的一个核心传感器(Sensor),用于监控外部 HTTP 端点是否满足特定条件(如返回特定状态码或响应内容)。它属于 Airflow 的传感器类(Sensor),用于在 DAG 执行期间等待某些外部条件成立后再继续执行后续任务。 HttpSensor 的主要功能包括: * 定期向目标 URL 发送 HTTP 请求(GET 或 POST)。 * 检查响应是否符合预期(如状态码是否为 200,或响应内容是否包含特定字符串)。 * 支持自定义请求头、请求体、超时设置和重试机制。 == 基本用法 == 以下是一个简单的 HttpSensor 示例,用于监控某个 API 是否可用: <syntaxhighlight lang="python"> from airflow import DAG from airflow.sensors.http_sensor import HttpSensor from datetime import datetime with DAG( dag_id="http_sensor_example", start_date=datetime(2023, 1, 1), schedule_interval="@daily", ) as dag: check_api_availability = HttpSensor( task_id="check_api_availability", http_conn_id="my_http_connection", # 定义在 Airflow Connections 中的 HTTP 连接 endpoint="/api/health", # API 端点路径 request_params={}, # 可选的查询参数 response_check=lambda response: response.status_code == 200, poke_interval=60, # 每 60 秒检查一次 timeout=300, # 超时时间(秒) ) </syntaxhighlight> === 参数说明 === * '''http_conn_id''':在 Airflow 的 Connections 中配置的 HTTP 连接信息(如基础 URL)。 * '''endpoint''':要监控的 API 端点路径(会附加到基础 URL 后)。 * '''response_check''':一个可调用对象,用于验证响应是否符合预期(如状态码或内容检查)。 * '''poke_interval''':两次检查之间的间隔时间(秒)。 * '''timeout''':传感器等待的最大时间(秒),超时后任务标记为失败。 == 高级配置 == HttpSensor 支持更复杂的 HTTP 请求配置,例如自定义请求头、请求体和认证方式。 === 自定义请求头和请求体 === <syntaxhighlight lang="python"> check_api_with_headers = HttpSensor( task_id="check_api_with_headers", http_conn_id="my_http_connection", endpoint="/api/data", method="POST", # 使用 POST 方法 headers={"Content-Type": "application/json", "Authorization": "Bearer token123"}, data='{"query": "status"}', # 请求体内容 response_check=lambda response: "ready" in response.text, ) </syntaxhighlight> === 使用响应内容检查 === 可以通过 `response_check` 函数进一步检查响应内容: <syntaxhighlight lang="python"> def check_response(response): # 检查状态码是否为 200 并且响应中包含 "success" 字段 return ( response.status_code == 200 and response.json().get("status") == "success" ) check_api_content = HttpSensor( task_id="check_api_content", http_conn_id="my_http_connection", endpoint="/api/status", response_check=check_response, ) </syntaxhighlight> == 实际案例 == 以下是一个真实场景示例,监控天气 API 是否返回有效数据后再执行后续任务: <syntaxhighlight lang="python"> from airflow import DAG from airflow.sensors.http_sensor import HttpSensor from airflow.operators.python import PythonOperator from datetime import datetime import requests def fetch_weather_data(**kwargs): # 实际获取天气数据的逻辑 response = requests.get("https://api.weather.com/data") return response.json() with DAG( dag_id="weather_data_pipeline", start_date=datetime(2023, 1, 1), schedule_interval="@hourly", ) as dag: check_weather_api = HttpSensor( task_id="check_weather_api", http_conn_id="weather_api_connection", endpoint="/data", response_check=lambda response: response.json().get("available") is True, poke_interval=30, ) fetch_data = PythonOperator( task_id="fetch_weather_data", python_callable=fetch_weather_data, ) check_weather_api >> fetch_data </syntaxhighlight> === 流程图 === <mermaid> graph LR A[开始] --> B[HttpSensor: 检查天气API] B --> C{API可用?} C -->|是| D[PythonOperator: 获取数据] C -->|否| B </mermaid> == 常见问题 == === 1. 如何处理 HTTPS 和 SSL 验证? === 可以通过设置 `extra_options` 参数禁用 SSL 验证(仅限测试环境): <syntaxhighlight lang="python"> HttpSensor( task_id="insecure_request", http_conn_id="my_http_connection", endpoint="/api", extra_options={"verify": False}, # 禁用 SSL 验证 ) </syntaxhighlight> === 2. 如何设置更复杂的重试逻辑? === 可以通过 `retries` 和 `retry_delay` 参数配置重试行为: <syntaxhighlight lang="python"> HttpSensor( task_id="retry_example", http_conn_id="my_http_connection", endpoint="/api", retries=3, # 最大重试次数 retry_delay=timedelta(minutes=5), # 每次重试间隔 ) </syntaxhighlight> == 总结 == HttpSensor 是 Airflow 中用于监控 HTTP 服务的强大工具,适用于以下场景: * 等待外部 API 就绪后再继续执行任务。 * 检查 Web 服务是否返回预期响应。 * 实现工作流中的外部依赖检查。 通过合理配置 `response_check` 和重试参数,可以构建健壮的监控逻辑,确保工作流仅在条件满足时继续执行。 [[Category:大数据框架]] [[Category:Airflow]] [[Category:Airflow Sensors应用]]
摘要:
请注意,所有对代码酷的贡献均被视为依照知识共享署名-非商业性使用-相同方式共享发表(详情请见
代码酷:著作权
)。如果您不希望您的文字作品被随意编辑和分发传播,请不要在此提交。
您同时也向我们承诺,您提交的内容为您自己所创作,或是复制自公共领域或类似自由来源。
未经许可,请勿提交受著作权保护的作品!
取消
编辑帮助
(在新窗口中打开)