启明办公

 找回密码
 立即注册
搜索
热搜: 活动 交友 discuz
查看: 96|回复: 0

Spring WebFlux-学习笔记(简单概念)

[复制链接]

3

主题

4

帖子

10

积分

新手上路

Rank: 1

积分
10
发表于 2022-12-14 19:15:11 | 显示全部楼层 |阅读模式
Spring框架中包含的原始Web框架Spring Web MVC是专门为Servlet API和Servlet容器而构建的。反应性堆栈Web框架Spring WebFlux在更高版本5.0中添加。它是完全非阻塞的,支持 Reactive Streams背压,并在Netty,Undertow和Servlet 3.1+容器等服务器上运行。
这两个Web框架都反映了其源模块的名称(spring-webmvc和 spring-webflux),并在Spring Framework中并存。每个模块都是可选的。应用程序可以使用一个模块,也可以使用两个模块,在某些情况下,也可以使用两个模块,例如,带有react的Spring MVC控制器WebClient。https://docs.spring.io/spring/docs/current/spring-framework-reference/web-reactive.html (官网文档)。
一.Spring WebFlux简单实例

1.1.pom.xml:

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.0.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.yoni</groupId>
    <artifactId>spring-webflux</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>spring-webflux</name>
    <description>SpringWebFlux</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>1.2.启动类

@SpringBootApplication
public class SpringWebfluxApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringWebfluxApplication.class, args);
    }

} 1.3.实例

@Slf4j
@RestController
public class DemoController {

    private String someString(String name) {
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return name;
    }


    @GetMapping(value = "/mono")
    public Mono<String> monoDemo() {
        log.info("start...");
        Mono<String> mono = Mono.fromSupplier(() -> someString("hello webflux"));
        log.info("end...");
        return mono;
    }
} 1.4.运行结果

控制台打印:hello webflux



图1 Spring WebFlux非阻塞测试


  • Mono:表示0-1个元素的异步序列
  • Flux:表示0-n个元素的异步序列
    @GetMapping(value = "/flux")
    public Flux<String> fluxDemo() {
        return Flux.just("Alibaba", "Tencent", "Baidu");
    }

    @GetMapping(value = "/fluxArray")
    public Flux<String> fluxArrayDemo() {
        String[] arrays = new String[] {"beijing", "tianjin", "shanghai", "chongqing"};
        return Flux.fromArray(arrays);
    }二.SSE

2.1.SSE简介

SSE(Server-Sent Event,服务端推送时间),HTML5规范中的一个组成部分,一个子规范。由于这是官方特性,主浏览器对其支持是比较好的。
2.2.SSE和WebSocket对比

WebSocket:是双工通道
SSE:是单工通道,只能是服务端想客户端发送消息
2.3.SSE技术规范


  • 服务端与浏览器之间的通讯协议
  • 浏览器中可供JavaScript使用的EventSource对象
2.4.通讯协议

这个通讯协议是基于纯文本的简单协议。服务器端的响应内容类型必须是“text/event-stream”。响应文本的内容是一个事件流,事件流是一个简单的文本流,仅支持UTF-8格式的编码。
事件流由不同的时间组成,不同时间间通过仅包含回车符合换行符的空行(“\r\n”)来分隔。
每个时间可由多行构成,每行由类型和数据两部分组成。类型与数据通过冒号(“:”)进行分隔,冒号钱的为类型,冒号后的为其对应的值。每个时间可以包含如下类型的行:

  • 类型为data,表示该行是时间所包含的数据。以data开头的行可以出现多次。所有这些行都是该时间的数据
  • 类型为event,表示该行用来声明事件名称。浏览器在收到数据时,会产生对应名称的事件
  • 类型为空白,表示该行是注释,会在处理时被忽略
  • 类型为retry,表示浏览器在连接断开之后进行重连等待时间
  • 类型为id,表示事件的标识符,标识符用于连续终端后的继连



图2 事件id的事件跟踪功能示意图

2.5.EventSource对象

对于服务端发送的带有事件的响应,浏览器需要在JavaScript中使用EventSource对象进行处理。EventSource使用的是标准的时间监听方式(注意,这里的事件并不是响应中所带的事件,而是浏览器上所发生的事件)。当响应的事件发生时,只需要使EventSource对象调用响应的事件处理方法即可。EventSource提供了三个标准事件。



图3 EventSource提供的标准事件

三.Reactive Stream

3.1.Reactive Stream概述

在流处理机制中发布/订阅模型可以分为push模型和pull模型。push模型中,发布者将元素主动推送给订阅者。而pull模式中,订阅者会向发布者主动索要。
在同步系统中发布者与订阅者的工作效率相当,发布者发布一个消息后阻塞,等待订阅者消费。订阅者消费完后,订阅者阻塞,等待发布者发布。这种同步式处理方式效率很低。
由于同步式处理方式效率很低,一般使用的是异步处理机制。即发布者发布消息,与消费者消费消息的速度是不一样的。比如:

  • 情况1:当订阅者消费比发布者发布快时,会出现订阅者无消息可消费的情况
  • 情况2:当发布者发布比订阅者消费快时,会出现消息堆积的情况。有两大类解决方案

    • 改变订阅者
    • 改变发布者。由于订阅者控制发布者的速度。这种解决方案称为背压(Back Pressure)。使用背压策略可确保较快的发布者不会压制较慢的订阅者。

3.2.反应式流规范

上述问题可以通过反应式流模型来进行解决,即:订阅者向发布者发送异步请求,订阅n个元素;然后发布者向订阅者异步发送n个或少于n个的元素。反应式流会在pull模型和push模型流处理机制之间动态切换。当发布者快订阅者慢时,它使用pull模型;当发布者慢订阅者快时,它使用push模型。
3.2.1.Flow()类
用于创建爱你流程控制组件的相关的接口与方法,其中Publishers生成由一个或多个Subscriber消费的items,每个items由Subscription管理。
3.2.2.Publisher<T>()接口
被订阅者接收的items的生产者。每个当前的订阅者以相同的顺序接收相同的items,除非items被删除或发生错误。如果一个发布者发生了“不允许items发布给订阅者”的错误,那么订阅者将处罚onError()方法,并且不再接受消息。否则,当发布者没有消息在发给订阅者时,订阅者将触发onComplete()方法。发布者可以确保每一个订阅者的订阅方法调用都严格按照顺序执行。
当Publisher调用close()方法后,就表示没有消息在发不了,其会触发onComplate()方法的执行。
3.2.2.1.Subscribe()接口
如果可能,添加给定的订阅者。如果已订阅,或者由于策略违反或错误而订阅失败,则使用Illegalstateexception调用订阅者的onError()方法。否则,订阅者的onSubscribe()方法伴随着新的订阅关系而被调用。订阅者可以通过调用此订阅令牌的request()方法来接收item,也可以通过调用其cancel()方法来取消订阅关系。
3.2.3.Subscriber<T>()接口
3.2.3.1.onSubscribe()
对于给定订阅关系的订阅者的其它方法调用之前先被调用的方法。当publisher调用了subscribe()方法与指定的订阅者确立了订阅关系后,就会触发当前方法的执行,该方法就会被调用一次。
3.2.3.2.onNext()
调用订阅令牌的下一个item的方法。如果该方法抛出异常,结果行为就不能被保证,但可能引起订阅关系取消。只有当前的item被消费完毕了,才会调用下一个。onNext()方法就是item被消费的地方。即当前onNext()方法的执行完毕,就会触发下一个onNext()方法的执行。onNext()被触发的次数,由订阅者Subscriber通过调用Subscription的request()方法请求的消息数量决定的。第一个onNext()方法是由Subscription接口的request()方法执行的。
3.2.3.3.onError()
当发布者或订阅令牌发生不可恢复的错误时调用的方法,在该错误之后,订阅令牌不会在调用其它订阅者方法。如果该方法本身发生异常,结果行为未定义。触发onError()方法的事件:发布者发布的消息不是订阅者的需求时;在确立订阅关系时出现异常;当Subscription的request()方法参数小于等于0时。
3.2.3.4.onComplate()
对于不是被错误终止的订阅关系,当订阅者知道没有其它订阅者方法调用再发生时,该方法被调用。对于该订阅关系,该方法之后将不再有其它订阅者方法被调用。如果该方法本身发生异常,结果行为未定义。该方法执行后,onNext()方法将不再执行,当Publisher调用了close()方法后,就表示没有消息在发布了,其会触发onComplate()方法的执行。
3.2.4.Subscription()接口
连接发布者与订阅者的消息控制器,只有当请求时订阅者才可接收items,并且可以在任何时间取消。这个接口中的方法只能被它的订阅者调用。在其它上下文环境中的用法没有定义其效果(即在其它类中调用该接口对象的方法最终的执行效果未定义)。其一般被翻译为“订阅令牌”或“订阅关系”。Subscription的方法只能被订阅者Subscriber调用,作为Subscriber中调用的对象出现。
3.2.4.1.request()
给未满足需求的订阅令牌添加指定数量n的items。若n小于等于0,订阅者将触发onError()方法,并被标记为IllegalArgumentException异常。否则(即n大于0),订阅者将触发n次onNext()调用(或更少,如果终止)。
3.2.4.2.cancel()
导致订阅者最终停止接收消息。实现会尽力而为,该方法被调用后可能还会收到其它消息。一个被取消的订阅令牌不再需要接收onComplate()或onError()的信号。
3.2.5.三个接口的关系



图3 以上三个接口关系简图

3.2.6.Processor<T, R>接口
Processor,即处理器,充当订阅者和发布者的处理阶段。Processor接口继承了Publisher和Subscriber接口,对于发布者来说,Processor是订阅者,对于订阅者来说,Processor是发布者。
Processor用于转换发布者/订阅者管道中的元素。Processor<T, R>会将来自于发布者的T类型的消息数据,接收并转换为R类型的数据,并将转换后的R类型数据发布给订阅者。一个发布者可以拥有多个处理中。
3.3.发布者类SubmissionPublisher

SubmissionPublisher是一个发布者,它能够以异步方式将已提交的非空元素发布给订阅者,直到该发布者被关闭。每个当前订阅者都会以相同的顺序接收新提交的元素,除非遇到删除或异常。使用SubmissionPublisher对象允许元素生成器以符合响应流的方式工作。发布者依靠丢弃处理和/或阻塞来进行流量控制。即:SubmissionPublisher是一个发布者,能够以响应流的方式生成消息元素,以异步方式发布消息元素。
3.3.1.submit()
将给定item发布到每个当前的订阅者,并通过异步方式调用其onNext()方法。当任何为订阅者准备的资源不可用时会阻塞,并且阻塞将不会被打断。
3.3.2.close()
除非已经关闭,否则会向当前所有订阅者发出执行onComplete()方法的信号,并且不允许后续发布尝试。返回时,该方法不保证所有订阅者都已完成。该方法的执行会向所有订阅者发送的“可以执行onComplate()方法”的信号,但各个订阅者是否真正能够执行onComplate()方法,当前方法无法保证,与订阅者的具体执行情况相关。
3.4.反应式流编程(发布-订阅模式)

@Slf4j
public class SomeSubscriber implements Flow.Subscriber<Integer> {

    // 声明订阅令牌
    private Flow.Subscription subscription;

    /**
     * 当订阅关系确立时会触发该方法的执行,即publisher的subscriber()方法的执行
     * 会触发该方法的执行
     * @param subscription
     */
    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        log.info("------>执行订阅者的onSubscribe()方法<------");
        this.subscription = subscription;
        // 首次订阅10条
        this.subscription.request(10);
    }

    /**
     * 对消息的消费过程就在这
     * 该方法的第一次触发是由onSubscribe()方法中的this.subscription.request(10);触发
     * @param item
     */
    @Override
    public void onNext(Integer item) {
        log.info("------>订阅者正在消费该item<------" + item);
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        // 订阅者没消费一条消息,就会在订阅5条
        this.subscription.request(5);
    }

    /**
     * 当发布者发布过程中,或订阅关系确立过程中,或者订阅者请求消息过程中,或消费者消费过程中出现异常过程中,
     * 则会触发该方法。
     * @param throwable
     */
    @Override
    public void onError(Throwable throwable) {
        log.info("------>执行onError()方法<------");
        throwable.printStackTrace();
        // 取消订阅关系
        this.subscription.cancel();
    }

    /**
     * 当所有消息消费完毕后会执行该方法
     */
    @Override
    public void onComplete() {
        log.info("------>发布者已关闭,订阅者已将消息全部消费完毕<------");
    }
}

@Slf4j
public class TestSubscriber {

    public static void main(String[] args) {
        // 创建发布者
        SubmissionPublisher<Integer> publisher = null;
        try {
            publisher = new SubmissionPublisher<>();
            // 创建订阅者
            SomeSubscriber subscriber = new SomeSubscriber();
            // 建立订阅关系
            publisher.subscribe(subscriber);
            // 生产消费
            for (int i = 0; i < 300; i++) {
                // 生成一个[0, 100)的随机数
                int item = new Random().nextInt(100);
                log.info("生产出第" + i + "条消息," + item);
                // 发布消息
                publisher.submit(item);
            }
        } finally {
            // 当所有消息发布完毕,关闭发布者
            if (publisher != null)
                publisher.close();
        }
        try {
            log.info("休眠:" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:ss:mm")));
            TimeUnit.SECONDS.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
} 3.5.响应式流编程(发布-处理-订阅模式)

@Slf4j
public class SomeProcessor extends SubmissionPublisher<String> implements Flow.Processor<Integer, String> {
   
    // 声明订阅令牌
    private Flow.Subscription subscription;

    /**
     * 当订阅关系确立时会触发该方法的执行,即publisher的subscriber()方法的执行
     * 会触发该方法的执行
     * @param subscription
     */
    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        log.info("------>执行订阅者的onSubscribe()方法<------");
        this.subscription = subscription;
        // 首次订阅10条
        this.subscription.request(10);
    }

    /**
     * 对消息的消费过程就在这
     * 该方法的第一次触发是由onSubscribe()方法中的this.subscription.request(10);触发
     * @param item
     */
    @Override
    public void onNext(Integer item) {
        log.info("------>订阅者正在消费该item<------" + item);
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        // 订阅者没消费一条消息,就会在订阅5条
        this.subscription.request(5);
    }

    /**
     * 当发布者发布过程中,或订阅关系确立过程中,或者订阅者请求消息过程中,或消费者消费过程中出现异常过程中,
     * 则会触发该方法。
     * @param throwable
     */
    @Override
    public void onError(Throwable throwable) {
        log.info("------>执行onError()方法<------");
        throwable.printStackTrace();
        // 取消订阅关系
        this.subscription.cancel();
    }

    /**
     * 当所有消息消费完毕后会执行该方法
     */
    @Override
    public void onComplete() {
        log.info("------>发布者已关闭,订阅者已将消息全部消费完毕<------");
    }
}
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

Archiver|手机版|小黑屋|启明办公

Copyright © 2001-2013 Comsenz Inc.Template by Comsenz Inc.All Rights Reserved.

Powered by Discuz!X3.4

快速回复 返回顶部 返回列表