Spring反应式编程
外观
Spring反应式编程[编辑 | 编辑源代码]
概述[编辑 | 编辑源代码]
Spring反应式编程(Reactive Programming)是Spring Framework 5引入的核心特性,基于Reactive Streams规范构建的非阻塞异步编程模型。它通过Publisher-Subscriber模式处理数据流,适用于高并发、低延迟的应用场景,与传统同步阻塞式编程形成对比。
关键组件:
- Project Reactor(Spring官方采用的实现库)
- WebFlux(反应式Web框架)
- Reactive Repositories(反应式数据访问)
数学本质可表示为数据流变换:
核心概念[编辑 | 编辑源代码]
响应式流规范[编辑 | 编辑源代码]
Reactive Streams定义四个接口:
// 发布者
public interface Publisher<T> {
void subscribe(Subscriber<? super T> s);
}
// 订阅者
public interface Subscriber<T> {
void onSubscribe(Subscription s);
void onNext(T t);
void onError(Throwable t);
void onComplete();
}
// 订阅控制
public interface Subscription {
void request(long n);
void cancel();
}
// 处理器
public interface Processor<T,R> extends Subscriber<T>, Publisher<R> {}
Reactor核心类型[编辑 | 编辑源代码]
类型 | 说明 | 特点 |
---|---|---|
Mono<T> |
0-1个元素的流 | 类似Optional
|
Flux<T> |
0-N个元素的流 | 类似List
|
代码示例[编辑 | 编辑源代码]
基础创建[编辑 | 编辑源代码]
// 创建Flux
Flux<String> flux = Flux.just("Spring", "Boot", "Reactive")
.map(String::toUpperCase)
.filter(s -> s.length() > 4);
// 订阅消费
flux.subscribe(
item -> System.out.println("Received: " + item),
error -> System.err.println("Error: " + error),
() -> System.out.println("Completed")
);
输出:
Received: SPRING Received: REACTIVE Completed
WebFlux控制器[编辑 | 编辑源代码]
@RestController
public class ReactiveController {
@GetMapping("/reactive")
public Flux<String> getReactiveData() {
return Flux.interval(Duration.ofSeconds(1))
.map(i -> "Event " + i)
.take(5);
}
}
客户端将每秒收到一个事件:
Event 0 Event 1 Event 2 Event 3 Event 4
实际应用场景[编辑 | 编辑源代码]
场景:实时股票报价系统
1. 使用Flux
持续推送市场数据
2. 背压控制防止客户端过载
3. WebSocket实现全双工通信
实现代码片段:
@GetMapping(value = "/stocks", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<StockQuote> streamQuotes() {
return stockService
.getLiveQuotes()
.onBackpressureDrop(q -> log.warn("Dropped: " + q));
}
与传统编程对比[编辑 | 编辑源代码]
维度 | 传统方式 | 反应式 |
---|---|---|
线程模型 | 1请求1线程 | 事件循环 |
资源消耗 | 高内存占用 | 低内存占用 |
吞吐量 | 受限于线程数 | 支持更高并发 |
代码风格 | 命令式 | 声明式 |
高级特性[编辑 | 编辑源代码]
背压处理[编辑 | 编辑源代码]
当生产者速度>消费者时,通过策略控制流量:
onBackpressureBuffer()
onBackpressureDrop()
onBackpressureLatest()
调度器控制[编辑 | 编辑源代码]
通过Schedulers
指定执行上下文:
Flux.range(1, 10)
.publishOn(Schedulers.parallel())
.subscribe(i -> System.out.println(Thread.currentThread().getName()));
最佳实践[编辑 | 编辑源代码]
1. 避免在反应式链中阻塞调用
2. 合理使用subscribeOn
/publishOn
3. 始终处理错误流(onError
)
4. 测试时使用StepVerifier
学习资源[编辑 | 编辑源代码]
- Project Reactor官方文档
- ReactiveX操作符图表
- Spring WebFlux参考指南