跳转到内容

Spring反应式编程

来自代码酷
Admin留言 | 贡献2025年5月1日 (四) 23:18的版本 (Page creation by admin bot)

(差异) ←上一版本 | 已核准修订 (差异) | 最后版本 (差异) | 下一版本→ (差异)

Spring反应式编程[编辑 | 编辑源代码]

概述[编辑 | 编辑源代码]

Spring反应式编程(Reactive Programming)是Spring Framework 5引入的核心特性,基于Reactive Streams规范构建的非阻塞异步编程模型。它通过Publisher-Subscriber模式处理数据流,适用于高并发、低延迟的应用场景,与传统同步阻塞式编程形成对比。

关键组件:

  • Project Reactor(Spring官方采用的实现库)
  • WebFlux(反应式Web框架)
  • Reactive Repositories(反应式数据访问)

数学本质可表示为数据流变换: F(x):Publisher<T>Publisher<R>

核心概念[编辑 | 编辑源代码]

响应式流规范[编辑 | 编辑源代码]

Reactive Streams定义四个接口:

// 发布者
public interface Publisher<T> {
    void subscribe(Subscriber<? super T> s);
}

// 订阅者
public interface Subscriber<T> {
    void onSubscribe(Subscription s);
    void onNext(T t);
    void onError(Throwable t);
    void onComplete();
}

// 订阅控制
public interface Subscription {
    void request(long n);
    void cancel();
}

// 处理器
public interface Processor<T,R> extends Subscriber<T>, Publisher<R> {}

Reactor核心类型[编辑 | 编辑源代码]

Reactor类型对比
类型 说明 特点
Mono<T> 0-1个元素的流 类似Optional
Flux<T> 0-N个元素的流 类似List

sequenceDiagram Publisher->>Subscriber: onSubscribe(Subscription) Subscriber->>Subscription: request(3) Publisher->>Subscriber: onNext(item1) Publisher->>Subscriber: onNext(item2) Publisher->>Subscriber: onComplete()

代码示例[编辑 | 编辑源代码]

基础创建[编辑 | 编辑源代码]

// 创建Flux
Flux<String> flux = Flux.just("Spring", "Boot", "Reactive")
    .map(String::toUpperCase)
    .filter(s -> s.length() > 4);

// 订阅消费
flux.subscribe(
    item -> System.out.println("Received: " + item),
    error -> System.err.println("Error: " + error),
    () -> System.out.println("Completed")
);

输出:

Received: SPRING
Received: REACTIVE
Completed

WebFlux控制器[编辑 | 编辑源代码]

@RestController
public class ReactiveController {

    @GetMapping("/reactive")
    public Flux<String> getReactiveData() {
        return Flux.interval(Duration.ofSeconds(1))
            .map(i -> "Event " + i)
            .take(5);
    }
}

客户端将每秒收到一个事件:

Event 0
Event 1
Event 2
Event 3
Event 4

实际应用场景[编辑 | 编辑源代码]

场景:实时股票报价系统 1. 使用Flux持续推送市场数据 2. 背压控制防止客户端过载 3. WebSocket实现全双工通信

graph LR A[Market Data Feed] --> B(WebFlux Server) B --> C[WebSocket Client 1] B --> D[WebSocket Client 2] B --> E[WebSocket Client N]

实现代码片段:

@GetMapping(value = "/stocks", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<StockQuote> streamQuotes() {
    return stockService
        .getLiveQuotes()
        .onBackpressureDrop(q -> log.warn("Dropped: " + q));
}

与传统编程对比[编辑 | 编辑源代码]

同步vs反应式对比
维度 传统方式 反应式
线程模型 1请求1线程 事件循环
资源消耗 高内存占用 低内存占用
吞吐量 受限于线程数 支持更高并发
代码风格 命令式 声明式

高级特性[编辑 | 编辑源代码]

背压处理[编辑 | 编辑源代码]

当生产者速度>消费者时,通过策略控制流量:

  • onBackpressureBuffer()
  • onBackpressureDrop()
  • onBackpressureLatest()

调度器控制[编辑 | 编辑源代码]

通过Schedulers指定执行上下文:

Flux.range(1, 10)
    .publishOn(Schedulers.parallel())
    .subscribe(i -> System.out.println(Thread.currentThread().getName()));

最佳实践[编辑 | 编辑源代码]

1. 避免在反应式链中阻塞调用 2. 合理使用subscribeOn/publishOn 3. 始终处理错误流(onError) 4. 测试时使用StepVerifier

学习资源[编辑 | 编辑源代码]

  • Project Reactor官方文档
  • ReactiveX操作符图表
  • Spring WebFlux参考指南