跳转到内容
主菜单
主菜单
移至侧栏
隐藏
导航
首页
最近更改
随机页面
MediaWiki帮助
代码酷
搜索
搜索
中文(中国大陆)
外观
创建账号
登录
个人工具
创建账号
登录
未登录编辑者的页面
了解详情
贡献
讨论
编辑“︁
Airflow SqlSensor
”︁
页面
讨论
大陆简体
阅读
编辑
编辑源代码
查看历史
工具
工具
移至侧栏
隐藏
操作
阅读
编辑
编辑源代码
查看历史
常规
链入页面
相关更改
特殊页面
页面信息
外观
移至侧栏
隐藏
您的更改会在有权核准的用户核准后向读者展示。
警告:
您没有登录。如果您进行任何编辑,您的IP地址会公开展示。如果您
登录
或
创建账号
,您的编辑会以您的用户名署名,此外还有其他益处。
反垃圾检查。
不要
加入这个!
= Airflow SqlSensor = '''SqlSensor''' 是 Apache Airflow 中的一个关键传感器(Sensor),用于监控数据库查询结果,并根据条件触发后续任务。它属于 Airflow 的传感器类别,专门用于等待某些外部条件满足后再继续执行工作流。 == 介绍 == SqlSensor 允许用户定义一个 SQL 查询,并定期执行该查询,直到查询返回的结果满足特定条件(例如,返回非空值或符合某个阈值)。这在数据管道中非常有用,例如等待某个表中的数据到达、检查某个指标是否达到预期值,或确认外部系统已完成数据处理。 SqlSensor 继承自基类 `airflow.sensors.base.BaseSensorOperator`,并支持以下核心功能: * 定期轮询数据库 * 检查 SQL 查询结果是否符合条件 * 超时处理和重试机制 == 基本用法 == 以下是 SqlSensor 的基本使用示例: <syntaxhighlight lang="python"> from airflow import DAG from airflow.sensors.sql import SqlSensor from airflow.utils.dates import days_ago dag = DAG( 'sql_sensor_example', schedule_interval=None, start_date=days_ago(1), ) # 定义一个 SqlSensor,检查表中是否存在符合条件的记录 wait_for_data = SqlSensor( task_id='wait_for_data', conn_id='my_db_connection', sql="SELECT COUNT(*) FROM my_table WHERE status = 'processed'", mode='poke', timeout=3600, poke_interval=60, dag=dag, ) </syntaxhighlight> === 参数说明 === * '''conn_id''':配置在 Airflow 中的数据库连接 ID * '''sql''':要执行的 SQL 查询 * '''mode''':传感器模式,通常为 'poke'(持续检查)或 'reschedule'(在检查间隔期间释放工作线程) * '''timeout''':传感器等待的总时间(秒) * '''poke_interval''':两次检查之间的间隔时间(秒) == 工作原理 == SqlSensor 的工作流程可以用以下 mermaid 图表示: <mermaid> graph TD A[开始] --> B[执行SQL查询] B --> C{结果满足条件?} C -->|是| D[标记为成功] C -->|否| E{超时?} E -->|是| F[标记为失败] E -->|否| G[等待poke_interval] G --> B </mermaid> == 高级配置 == === 自定义条件判断 === 默认情况下,SqlSensor 认为查询返回非空结果即为条件满足。但可以通过继承 SqlSensor 并重写 `poke()` 方法来实现自定义逻辑: <syntaxhighlight lang="python"> from airflow.sensors.sql import SqlSensor class CustomSqlSensor(SqlSensor): def poke(self, context): result = super().poke(context) # 自定义条件:要求返回的数值大于10 return int(result) > 10 if result else False </syntaxhighlight> === 使用模板 === SqlSensor 支持 Airflow 的模板功能,可以在 SQL 查询中使用 Jinja2 模板变量: <syntaxhighlight lang="python"> wait_for_specific_date = SqlSensor( task_id='wait_for_specific_date', conn_id='my_db_connection', sql="SELECT COUNT(*) FROM events WHERE event_date = '{{ ds }}'", dag=dag, ) </syntaxhighlight> == 实际案例 == === 案例1:等待数据加载完成 === 假设有一个夜间 ETL 流程,需要等待源系统中的数据加载完成才能开始处理: <syntaxhighlight lang="python"> wait_for_etl_completion = SqlSensor( task_id='wait_for_etl_completion', conn_id='data_warehouse', sql=""" SELECT 1 FROM etl_control_table WHERE date = CURRENT_DATE AND status = 'COMPLETED' """, timeout=7200, # 等待2小时 poke_interval=300, # 每5分钟检查一次 dag=dag, ) </syntaxhighlight> === 案例2:检查数据质量 === 在数据管道中确保关键指标达到预期阈值: <syntaxhighlight lang="python"> check_data_quality = SqlSensor( task_id='check_data_quality', conn_id='analytics_db', sql=""" SELECT CASE WHEN COUNT(*) > 1000000 THEN 1 ELSE NULL END FROM user_activity WHERE event_date = '{{ ds }}' """, mode='reschedule', dag=dag, ) </syntaxhighlight> == 性能考虑 == 使用 SqlSensor 时应注意: 1. '''查询效率''':确保 SQL 查询高效,避免全表扫描 2. '''轮询间隔''':根据业务需求合理设置 poke_interval 3. '''连接管理''':避免过多并发连接导致数据库压力 == 数学表达 == SqlSensor 的等待时间可以用以下公式表示: <math> T_{total} = \min\left(T_{timeout}, N \times T_{interval}\right) </math> 其中: * <math>T_{total}</math> 是实际等待的总时间 * <math>T_{timeout}</math> 是配置的超时时间 * <math>T_{interval}</math> 是轮询间隔 * <math>N</math> 是实际执行的轮询次数 == 常见问题 == === 1. 如何处理长运行查询? === 对于可能执行时间较长的查询,建议: * 设置适当的 timeout * 考虑使用存储过程替代复杂查询 * 在数据库中创建适当的索引 === 2. 传感器一直等待怎么办? === 检查: * SQL 查询是否正确返回预期结果 * 数据库连接是否正常 * 是否有足够的权限执行查询 == 总结 == SqlSensor 是 Airflow 中强大的数据库监控工具,通过灵活配置可以实现多种等待策略。合理使用 SqlSensor 可以使数据管道更加健壮和可靠,确保任务只在数据准备好的情况下执行。 [[Category:大数据框架]] [[Category:Airflow]] [[Category:Airflow Sensors应用]]
摘要:
请注意,所有对代码酷的贡献均被视为依照知识共享署名-非商业性使用-相同方式共享发表(详情请见
代码酷:著作权
)。如果您不希望您的文字作品被随意编辑和分发传播,请不要在此提交。
您同时也向我们承诺,您提交的内容为您自己所创作,或是复制自公共领域或类似自由来源。
未经许可,请勿提交受著作权保护的作品!
取消
编辑帮助
(在新窗口中打开)