|
Spring框架中包含的原始Web框架Spring Web MVC是专门为Servlet API和Servlet容器而构建的。反应性堆栈Web框架Spring WebFlux在更高版本5.0中添加。它是完全非阻塞的,支持 Reactive Streams背压,并在Netty,Undertow和Servlet 3.1+容器等服务器上运行。
这两个Web框架都反映了其源模块的名称(spring-webmvc和 spring-webflux),并在Spring Framework中并存。每个模块都是可选的。应用程序可以使用一个模块,也可以使用两个模块,在某些情况下,也可以使用两个模块,例如,带有react的Spring MVC控制器WebClient。https://docs.spring.io/spring/docs/current/spring-framework-reference/web-reactive.html (官网文档)。
一.Spring WebFlux简单实例
1.1.pom.xml:
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.0.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.yoni</groupId>
<artifactId>spring-webflux</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>spring-webflux</name>
<description>SpringWebFlux</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>1.2.启动类
@SpringBootApplication
public class SpringWebfluxApplication {
public static void main(String[] args) {
SpringApplication.run(SpringWebfluxApplication.class, args);
}
} 1.3.实例
@Slf4j
@RestController
public class DemoController {
private String someString(String name) {
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
return name;
}
@GetMapping(value = &#34;/mono&#34;)
public Mono<String> monoDemo() {
log.info(&#34;start...&#34;);
Mono<String> mono = Mono.fromSupplier(() -> someString(&#34;hello webflux&#34;));
log.info(&#34;end...&#34;);
return mono;
}
} 1.4.运行结果
控制台打印:hello webflux

图1 Spring WebFlux非阻塞测试
- Mono:表示0-1个元素的异步序列
- Flux:表示0-n个元素的异步序列
@GetMapping(value = &#34;/flux&#34;)
public Flux<String> fluxDemo() {
return Flux.just(&#34;Alibaba&#34;, &#34;Tencent&#34;, &#34;Baidu&#34;);
}
@GetMapping(value = &#34;/fluxArray&#34;)
public Flux<String> fluxArrayDemo() {
String[] arrays = new String[] {&#34;beijing&#34;, &#34;tianjin&#34;, &#34;shanghai&#34;, &#34;chongqing&#34;};
return Flux.fromArray(arrays);
}二.SSE
2.1.SSE简介
SSE(Server-Sent Event,服务端推送时间),HTML5规范中的一个组成部分,一个子规范。由于这是官方特性,主浏览器对其支持是比较好的。
2.2.SSE和WebSocket对比
WebSocket:是双工通道
SSE:是单工通道,只能是服务端想客户端发送消息
2.3.SSE技术规范
- 服务端与浏览器之间的通讯协议
- 浏览器中可供JavaScript使用的EventSource对象
2.4.通讯协议
这个通讯协议是基于纯文本的简单协议。服务器端的响应内容类型必须是“text/event-stream”。响应文本的内容是一个事件流,事件流是一个简单的文本流,仅支持UTF-8格式的编码。
事件流由不同的时间组成,不同时间间通过仅包含回车符合换行符的空行(“\r\n”)来分隔。
每个时间可由多行构成,每行由类型和数据两部分组成。类型与数据通过冒号(“:”)进行分隔,冒号钱的为类型,冒号后的为其对应的值。每个时间可以包含如下类型的行:
- 类型为data,表示该行是时间所包含的数据。以data开头的行可以出现多次。所有这些行都是该时间的数据
- 类型为event,表示该行用来声明事件名称。浏览器在收到数据时,会产生对应名称的事件
- 类型为空白,表示该行是注释,会在处理时被忽略
- 类型为retry,表示浏览器在连接断开之后进行重连等待时间
- 类型为id,表示事件的标识符,标识符用于连续终端后的继连

图2 事件id的事件跟踪功能示意图
2.5.EventSource对象
对于服务端发送的带有事件的响应,浏览器需要在JavaScript中使用EventSource对象进行处理。EventSource使用的是标准的时间监听方式(注意,这里的事件并不是响应中所带的事件,而是浏览器上所发生的事件)。当响应的事件发生时,只需要使EventSource对象调用响应的事件处理方法即可。EventSource提供了三个标准事件。

图3 EventSource提供的标准事件
三.Reactive Stream
3.1.Reactive Stream概述
在流处理机制中发布/订阅模型可以分为push模型和pull模型。push模型中,发布者将元素主动推送给订阅者。而pull模式中,订阅者会向发布者主动索要。
在同步系统中发布者与订阅者的工作效率相当,发布者发布一个消息后阻塞,等待订阅者消费。订阅者消费完后,订阅者阻塞,等待发布者发布。这种同步式处理方式效率很低。
由于同步式处理方式效率很低,一般使用的是异步处理机制。即发布者发布消息,与消费者消费消息的速度是不一样的。比如:
- 情况1:当订阅者消费比发布者发布快时,会出现订阅者无消息可消费的情况
- 情况2:当发布者发布比订阅者消费快时,会出现消息堆积的情况。有两大类解决方案
- 改变订阅者
- 改变发布者。由于订阅者控制发布者的速度。这种解决方案称为背压(Back Pressure)。使用背压策略可确保较快的发布者不会压制较慢的订阅者。
3.2.反应式流规范
上述问题可以通过反应式流模型来进行解决,即:订阅者向发布者发送异步请求,订阅n个元素;然后发布者向订阅者异步发送n个或少于n个的元素。反应式流会在pull模型和push模型流处理机制之间动态切换。当发布者快订阅者慢时,它使用pull模型;当发布者慢订阅者快时,它使用push模型。
3.2.1.Flow()类
用于创建爱你流程控制组件的相关的接口与方法,其中Publishers生成由一个或多个Subscriber消费的items,每个items由Subscription管理。
3.2.2.Publisher<T>()接口
被订阅者接收的items的生产者。每个当前的订阅者以相同的顺序接收相同的items,除非items被删除或发生错误。如果一个发布者发生了“不允许items发布给订阅者”的错误,那么订阅者将处罚onError()方法,并且不再接受消息。否则,当发布者没有消息在发给订阅者时,订阅者将触发onComplete()方法。发布者可以确保每一个订阅者的订阅方法调用都严格按照顺序执行。
当Publisher调用close()方法后,就表示没有消息在发不了,其会触发onComplate()方法的执行。
3.2.2.1.Subscribe()接口
如果可能,添加给定的订阅者。如果已订阅,或者由于策略违反或错误而订阅失败,则使用Illegalstateexception调用订阅者的onError()方法。否则,订阅者的onSubscribe()方法伴随着新的订阅关系而被调用。订阅者可以通过调用此订阅令牌的request()方法来接收item,也可以通过调用其cancel()方法来取消订阅关系。
3.2.3.Subscriber<T>()接口
3.2.3.1.onSubscribe()
对于给定订阅关系的订阅者的其它方法调用之前先被调用的方法。当publisher调用了subscribe()方法与指定的订阅者确立了订阅关系后,就会触发当前方法的执行,该方法就会被调用一次。
3.2.3.2.onNext()
调用订阅令牌的下一个item的方法。如果该方法抛出异常,结果行为就不能被保证,但可能引起订阅关系取消。只有当前的item被消费完毕了,才会调用下一个。onNext()方法就是item被消费的地方。即当前onNext()方法的执行完毕,就会触发下一个onNext()方法的执行。onNext()被触发的次数,由订阅者Subscriber通过调用Subscription的request()方法请求的消息数量决定的。第一个onNext()方法是由Subscription接口的request()方法执行的。
3.2.3.3.onError()
当发布者或订阅令牌发生不可恢复的错误时调用的方法,在该错误之后,订阅令牌不会在调用其它订阅者方法。如果该方法本身发生异常,结果行为未定义。触发onError()方法的事件:发布者发布的消息不是订阅者的需求时;在确立订阅关系时出现异常;当Subscription的request()方法参数小于等于0时。
3.2.3.4.onComplate()
对于不是被错误终止的订阅关系,当订阅者知道没有其它订阅者方法调用再发生时,该方法被调用。对于该订阅关系,该方法之后将不再有其它订阅者方法被调用。如果该方法本身发生异常,结果行为未定义。该方法执行后,onNext()方法将不再执行,当Publisher调用了close()方法后,就表示没有消息在发布了,其会触发onComplate()方法的执行。
3.2.4.Subscription()接口
连接发布者与订阅者的消息控制器,只有当请求时订阅者才可接收items,并且可以在任何时间取消。这个接口中的方法只能被它的订阅者调用。在其它上下文环境中的用法没有定义其效果(即在其它类中调用该接口对象的方法最终的执行效果未定义)。其一般被翻译为“订阅令牌”或“订阅关系”。Subscription的方法只能被订阅者Subscriber调用,作为Subscriber中调用的对象出现。
3.2.4.1.request()
给未满足需求的订阅令牌添加指定数量n的items。若n小于等于0,订阅者将触发onError()方法,并被标记为IllegalArgumentException异常。否则(即n大于0),订阅者将触发n次onNext()调用(或更少,如果终止)。
3.2.4.2.cancel()
导致订阅者最终停止接收消息。实现会尽力而为,该方法被调用后可能还会收到其它消息。一个被取消的订阅令牌不再需要接收onComplate()或onError()的信号。
3.2.5.三个接口的关系

图3 以上三个接口关系简图
3.2.6.Processor<T, R>接口
Processor,即处理器,充当订阅者和发布者的处理阶段。Processor接口继承了Publisher和Subscriber接口,对于发布者来说,Processor是订阅者,对于订阅者来说,Processor是发布者。
Processor用于转换发布者/订阅者管道中的元素。Processor<T, R>会将来自于发布者的T类型的消息数据,接收并转换为R类型的数据,并将转换后的R类型数据发布给订阅者。一个发布者可以拥有多个处理中。
3.3.发布者类SubmissionPublisher
SubmissionPublisher是一个发布者,它能够以异步方式将已提交的非空元素发布给订阅者,直到该发布者被关闭。每个当前订阅者都会以相同的顺序接收新提交的元素,除非遇到删除或异常。使用SubmissionPublisher对象允许元素生成器以符合响应流的方式工作。发布者依靠丢弃处理和/或阻塞来进行流量控制。即:SubmissionPublisher是一个发布者,能够以响应流的方式生成消息元素,以异步方式发布消息元素。
3.3.1.submit()
将给定item发布到每个当前的订阅者,并通过异步方式调用其onNext()方法。当任何为订阅者准备的资源不可用时会阻塞,并且阻塞将不会被打断。
3.3.2.close()
除非已经关闭,否则会向当前所有订阅者发出执行onComplete()方法的信号,并且不允许后续发布尝试。返回时,该方法不保证所有订阅者都已完成。该方法的执行会向所有订阅者发送的“可以执行onComplate()方法”的信号,但各个订阅者是否真正能够执行onComplate()方法,当前方法无法保证,与订阅者的具体执行情况相关。
3.4.反应式流编程(发布-订阅模式)
@Slf4j
public class SomeSubscriber implements Flow.Subscriber<Integer> {
// 声明订阅令牌
private Flow.Subscription subscription;
/**
* 当订阅关系确立时会触发该方法的执行,即publisher的subscriber()方法的执行
* 会触发该方法的执行
* @param subscription
*/
@Override
public void onSubscribe(Flow.Subscription subscription) {
log.info(&#34;------>执行订阅者的onSubscribe()方法<------&#34;);
this.subscription = subscription;
// 首次订阅10条
this.subscription.request(10);
}
/**
* 对消息的消费过程就在这
* 该方法的第一次触发是由onSubscribe()方法中的this.subscription.request(10);触发
* @param item
*/
@Override
public void onNext(Integer item) {
log.info(&#34;------>订阅者正在消费该item<------&#34; + item);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 订阅者没消费一条消息,就会在订阅5条
this.subscription.request(5);
}
/**
* 当发布者发布过程中,或订阅关系确立过程中,或者订阅者请求消息过程中,或消费者消费过程中出现异常过程中,
* 则会触发该方法。
* @param throwable
*/
@Override
public void onError(Throwable throwable) {
log.info(&#34;------>执行onError()方法<------&#34;);
throwable.printStackTrace();
// 取消订阅关系
this.subscription.cancel();
}
/**
* 当所有消息消费完毕后会执行该方法
*/
@Override
public void onComplete() {
log.info(&#34;------>发布者已关闭,订阅者已将消息全部消费完毕<------&#34;);
}
}
@Slf4j
public class TestSubscriber {
public static void main(String[] args) {
// 创建发布者
SubmissionPublisher<Integer> publisher = null;
try {
publisher = new SubmissionPublisher<>();
// 创建订阅者
SomeSubscriber subscriber = new SomeSubscriber();
// 建立订阅关系
publisher.subscribe(subscriber);
// 生产消费
for (int i = 0; i < 300; i++) {
// 生成一个[0, 100)的随机数
int item = new Random().nextInt(100);
log.info(&#34;生产出第&#34; + i + &#34;条消息,&#34; + item);
// 发布消息
publisher.submit(item);
}
} finally {
// 当所有消息发布完毕,关闭发布者
if (publisher != null)
publisher.close();
}
try {
log.info(&#34;休眠:&#34; + LocalDateTime.now().format(DateTimeFormatter.ofPattern(&#34;yyyy-MM-dd HH:ss:mm&#34;)));
TimeUnit.SECONDS.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} 3.5.响应式流编程(发布-处理-订阅模式)
@Slf4j
public class SomeProcessor extends SubmissionPublisher<String> implements Flow.Processor<Integer, String> {
// 声明订阅令牌
private Flow.Subscription subscription;
/**
* 当订阅关系确立时会触发该方法的执行,即publisher的subscriber()方法的执行
* 会触发该方法的执行
* @param subscription
*/
@Override
public void onSubscribe(Flow.Subscription subscription) {
log.info(&#34;------>执行订阅者的onSubscribe()方法<------&#34;);
this.subscription = subscription;
// 首次订阅10条
this.subscription.request(10);
}
/**
* 对消息的消费过程就在这
* 该方法的第一次触发是由onSubscribe()方法中的this.subscription.request(10);触发
* @param item
*/
@Override
public void onNext(Integer item) {
log.info(&#34;------>订阅者正在消费该item<------&#34; + item);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 订阅者没消费一条消息,就会在订阅5条
this.subscription.request(5);
}
/**
* 当发布者发布过程中,或订阅关系确立过程中,或者订阅者请求消息过程中,或消费者消费过程中出现异常过程中,
* 则会触发该方法。
* @param throwable
*/
@Override
public void onError(Throwable throwable) {
log.info(&#34;------>执行onError()方法<------&#34;);
throwable.printStackTrace();
// 取消订阅关系
this.subscription.cancel();
}
/**
* 当所有消息消费完毕后会执行该方法
*/
@Override
public void onComplete() {
log.info(&#34;------>发布者已关闭,订阅者已将消息全部消费完毕<------&#34;);
}
} |
|