跳转到内容
主菜单
主菜单
移至侧栏
隐藏
导航
首页
最近更改
随机页面
MediaWiki帮助
代码酷
搜索
搜索
中文(中国大陆)
外观
创建账号
登录
个人工具
创建账号
登录
未登录编辑者的页面
了解详情
贡献
讨论
编辑“︁
Airflow Sensor模式
”︁
页面
讨论
大陆简体
阅读
编辑
编辑源代码
查看历史
工具
工具
移至侧栏
隐藏
操作
阅读
编辑
编辑源代码
查看历史
常规
链入页面
相关更改
特殊页面
页面信息
外观
移至侧栏
隐藏
您的更改会在有权核准的用户核准后向读者展示。
警告:
您没有登录。如果您进行任何编辑,您的IP地址会公开展示。如果您
登录
或
创建账号
,您的编辑会以您的用户名署名,此外还有其他益处。
反垃圾检查。
不要
加入这个!
= Airflow Sensor模式 = == 介绍 == '''Airflow Sensor模式'''是Apache Airflow中一种特殊的操作符(Operator),用于持续监测外部系统状态,直到满足特定条件后才触发后续任务。传感器(Sensor)通过轮询或异步机制检查目标状态(如文件是否存在、数据库记录是否更新等),常用于工作流中对依赖条件的监控。与普通Operator不同,Sensor会阻塞任务执行直到条件满足或超时。 Sensor的核心特性包括: * '''阻塞性''':任务停留在运行状态,直到条件满足。 * '''可配置超时''':避免无限等待,通过`timeout`参数控制最长等待时间。 * '''多种模式''':包括硬性等待(poke模式)和延迟重试(reschedule模式)。 == 工作原理 == Sensor通过以下两种模式运行: === 1. Poke模式(默认) === 传感器在同一个工作进程中持续轮询(默认间隔60秒),占用任务槽(slot)直到条件满足。 <mermaid> graph TD A[开始] --> B{条件满足?} B -- 否 --> C[等待poke_interval] C --> B B -- 是 --> D[触发下游任务] </mermaid> === 2. Reschedule模式 === 传感器每次检查后释放任务槽,并在下次检查时重新调度。适合长时间等待的场景,减少资源占用。 <mermaid> graph TD A[开始] --> B{条件满足?} B -- 否 --> C[释放任务槽并计划下次检查] C --> D[等待mode.reschedule] D --> B B -- 是 --> E[触发下游任务] </mermaid> == 代码示例 == 以下示例展示如何定义一个监测文件是否存在的`FileSensor`: <syntaxhighlight lang="python"> from airflow import DAG from airflow.sensors.filesystem import FileSensor from datetime import datetime with DAG('file_sensor_example', start_date=datetime(2023, 1, 1)) as dag: wait_for_file = FileSensor( task_id='wait_for_file', filepath='/data/example.txt', mode='poke', # 或 'reschedule' poke_interval=30, # 检查间隔(秒) timeout=3600, # 超时时间(秒) ) </syntaxhighlight> '''输出说明''': * 若文件在1小时内出现,任务标记为`success`并触发下游任务。 * 若超时未满足条件,任务标记为`failed`。 == 实际应用场景 == === 场景1:数据管道依赖检查 === 监测上游系统是否生成数据文件,确保ETL任务仅在数据就绪时执行。 <syntaxhighlight lang="python"> from airflow.sensors.external_task import ExternalTaskSensor wait_for_upstream = ExternalTaskSensor( task_id='wait_for_upstream', external_dag_id='upstream_dag', external_task_id='load_data', mode='reschedule', timeout=86400 # 最长等待24小时 ) </syntaxhighlight> === 场景2:API响应监测 === 使用`HttpSensor`检查API服务是否可用: <syntaxhighlight lang="python"> from airflow.sensors.http_sensor import HttpSensor check_api = HttpSensor( task_id='check_api', endpoint='/health', response_check=lambda response: response.status_code == 200, mode='poke', poke_interval=10 ) </syntaxhighlight> == 高级配置 == === 自定义Sensor === 继承`BaseSensorOperator`实现自定义逻辑: <syntaxhighlight lang="python"> from airflow.sensors.base import BaseSensorOperator class CustomSensor(BaseSensorOperator): def poke(self, context): return check_custom_condition() # 返回True/False custom_sensor = CustomSensor(task_id='custom_sensor') </syntaxhighlight> === 超时与指数退避 === 通过`exponential_backoff`参数优化轮询效率: <math> \text{wait\_time} = \text{poke\_interval} \times 2^{(\text{retry\_count} - 1)} </math> <syntaxhighlight lang="python"> FileSensor( ..., exponential_backoff=True, # 启用指数退避 poke_interval=10 ) </syntaxhighlight> == 最佳实践 == * 短时检查(<5分钟)使用`poke`模式。 * 长时等待(>5分钟)使用`reschedule`模式以释放资源。 * 为`timeout`设置合理值,避免任务卡死。 * 在Sensor任务添加描述(`doc_md`)说明检查条件。 == 常见问题 == '''Q: Sensor卡住不退出怎么办?''' A: 检查`timeout`是否过小,或条件逻辑是否永远无法满足。 '''Q: 如何减少Sensor资源消耗?''' A: 增大`poke_interval`或切换至`reschedule`模式。 [[Category:大数据框架]] [[Category:Airflow]] [[Category:Airflow Sensors应用]]
摘要:
请注意,所有对代码酷的贡献均被视为依照知识共享署名-非商业性使用-相同方式共享发表(详情请见
代码酷:著作权
)。如果您不希望您的文字作品被随意编辑和分发传播,请不要在此提交。
您同时也向我们承诺,您提交的内容为您自己所创作,或是复制自公共领域或类似自由来源。
未经许可,请勿提交受著作权保护的作品!
取消
编辑帮助
(在新窗口中打开)