Spring WebFlux

概述

Spring 5 是继 Spring 4 之后将近四年内的一个重大的版本升级。Spring 5 中最重要改动是把反应式编程的思想应用到了框架的各个方面,Spring 5 的反应式编程以 Reactor 库为基础。

Spring WebFlux 是 Spring 5 的反应式核心,名称中的 Flux 来源于 Reactor 中的类 Flux。Spring WebFlux 模块中包含了对反应式 HTTP、服务器推送事件和 WebSocket 的客户端和服务器端的支持。WebFlux 需要底层提供运行时的支持,WebFlux 可以运行在支持 Servlet 3.1 非阻塞 IO API 的 Servlet 容器上,或者其他异步运行时环境,如 Netty 和 Undertow。

同步Servlet和异步Servlet

在 Servlet 3.0 之前,Servlet采用One-Request-Per-Thread的方式处理请求,即每一个Http请求都由某一个线程从头到尾负责处理。如果一个请求需要进行IO操作,比如访问数据库、调用第三方服务接口等,那么其所对应的线程将同步地等待IO操作完成,而IO操作是非常慢的,所以此时的线程并不能及时地释放回线程池以供后续使用,在并发量越来越大的情况下,这将带来严重的性能问题。即便是像SpringMVC、Struts这样的框架也脱离不了这样的桎梏,因为它们都是建立在Servlet之上的。为了解决这样的问题,Servlet 3.0 引入了异步处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Slf4j
@WebServlet("/blocking-sync")
public class BlockingSyncServlet extends HttpServlet {

@Override
protected void doPost(HttpServletRequest request, HttpServletResponse response) {
long startTime = System.currentTimeMillis();
doExecute();
long endTime = System.currentTimeMillis();
log.debug("{}: Execute completed in {} ms", this, endTime - startTime);
}

private void doExecute() {
try {
// 模拟业务处理耗时操作
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException ignored) {
}
}

}

在 Servlet 3.0 中,我们可以从HttpServletRequest对象中获得一个AsyncContext对象,该对象构成了异步处理的上下文,Request和Response对象都可从中获取。AsyncContext可以从当前线程传给另外的线程,并在新的线程中完成对请求的处理并返回结果给客户端,初始线程便可以还回给容器线程池以处理更多的请求。如此,通过将请求从一个线程传给另一个线程处理的过程便构成了 Servlet 3.0 中的异步处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@Slf4j
@WebServlet(urlPatterns = "/blocking-async", asyncSupported = true)
public class BlockingAsyncServlet extends HttpServlet {

@Override
protected void doPost(HttpServletRequest request, HttpServletResponse response) {
long startTime = System.currentTimeMillis();
// 开启异步模式: Servlet线程不再是一直处于阻塞状态以等待业务逻辑的处理, 而是启动异步线程之后线程本身返回至容器
AsyncContext ctx = request.startAsync();
// 请求异步处理
doExecute(ctx);
long endTime = System.currentTimeMillis();
log.debug("{}: Execute completed in {} ms", this, endTime - startTime);
}

/**
* 直接调用AsyncContext的start()方法会向Servlet容器另外申请一个新的线程, 这种方式对性能的改进不大,
* 因为如果新的线程和初始线程共享同一个线程池的话, 相当于闲置下了一个线程, 但同时又占用了另一个线程.
*/
private void doExecute(AsyncContext ctx) {
CompletableFuture.runAsync(() -> {
try {
// 模拟业务处理耗时操作
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException ex) {
// Ignored
} finally {
// 处理完毕后需要调用complete()方法告知Servlet容器
ctx.complete();
}
});
}

}

Servlet 3.0 对请求的处理虽然是异步的,但是对InputStream和OutputStream的IO操作却依然是阻塞的,对于数据量大的请求体或者返回体,阻塞IO也将导致不必要的等待。因此在 Servlet 3.1 中又引入了非阻塞IO来进一步增强异步处理的性能,通过在HttpServletRequest和HttpServletResponse中分别添加ReadListener和WriterListener方式,只有在IO数据满足一定条件时(比如数据准备好时)才进行后续的操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
@Slf4j
@WebServlet(urlPatterns = "/non-blocking-async", asyncSupported = true)
public class NonBlockingAsyncServlet extends HttpServlet {

@Override
protected void doPost(HttpServletRequest request, HttpServletResponse response) throws IOException {
long startTime = System.currentTimeMillis();
AsyncContext ctx = request.startAsync();
request.getInputStream()
.setReadListener(new HttpRequestReadListener(ctx));
long endTime = System.currentTimeMillis();
log.debug("{}: Execute completed in {} ms", this, endTime - startTime);
}

private static class HttpRequestReadListener implements ReadListener {

private final AsyncContext asyncContext;

private HttpRequestReadListener(AsyncContext asyncContext) {
this.asyncContext = asyncContext;
}

@Override
public void onDataAvailable() {
}

@Override
public void onAllDataRead() {
CompletableFuture.runAsync(() -> {
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException ex) {
// Ignored
} finally {
asyncContext.complete();
}
});
}

@Override
public void onError(Throwable t) {
asyncContext.complete();
}

}

}

WebSocket和SSE

服务器端数据推送技术除了WebSocket外还有Server-sent Events(简称SSE),WebSocket 规范和 Server-sent Events 都是 HTML5 标准的组成部分,在主流浏览器上都提供了原生的支持,都是推荐使用的。WebSocket适用于需要进行复杂双向数据通讯的场景,对于简单的服务器数据推送的场景,使用服务器推送事件就足够了。

正如名称所表示的一样,WebSocket使用的是套接字连接,基于TCP协议。使用WebSocket之后,实际上是在服务器端和浏览器之间建立一个套接字连接,可以进行双向的数据传输。WebSocket功能强大,使用起来也灵活,可以适用于不同的场景。除了WebSocket之外,其他的实现方式是基于HTTP协议来达到实时推送的效果。

严格地说,HTTP 协议无法做到服务器主动推送信息。但是,有一种变通方法,就是服务器向客户端声明,接下来要发送的是流信息。也就是说,发送的不是一次性的数据包,而是一个数据流,会连续不断地发送过来。这时,客户端不会关闭连接,会一直等着服务器发过来的新的数据流,视频播放就是这样的例子。本质上,这种通信就是以流信息的方式,完成一次用时很长的下载。SSE 就是利用这种机制,使用流信息向浏览器推送信息。它基于 HTTP 协议,目前除了 IE/Edge,其他浏览器都支持。

总体来说,WebSocket 更强大和灵活,可以双向通信;SSE 是单向通信,只能服务器向浏览器发送,因为流信息本质上就是下载。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@WebServlet("/sse")
public class SseServlet extends HttpServlet {

private static final String TEXT_EVENT_STREAM_VALUE = "text/event-stream";

@Override
protected void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException {
long startTime = System.currentTimeMillis();
doExecute(response);
long endTime = System.currentTimeMillis();
System.out.println(String.format("%s >> Execute completed in %s ms", this, endTime - startTime));
}

private void doExecute(HttpServletResponse response) throws IOException {
response.setContentType(TEXT_EVENT_STREAM_VALUE);
response.setCharacterEncoding(StandardCharsets.UTF_8.name());
PrintWriter out = response.getWriter();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException ignored) {
}
out.write("event: myEvent\n");
out.write("retry: 10000\n");
out.write("data: " + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")) + "\n\n");
out.flush();
}

}
1
2
3
4
5
6
7
8
9
var sse = new EventSource('sse');
// 默认情况下, 服务器发来的数据, 总是触发浏览器EventSource实例的message事件
sse.addEventListener('message', function (e) {
console.log(e.data);
});
// 可自定义 SSE 事件, 这种情况下, 发送回来的数据不会触发message事件
sse.addEventListener('myEvent', function (e) {
console.log(e.data);
});

Reactive Stream

流 Stream 是什么?流是序列,是生产者生产、一个或多个消费者消费的元素序列。这种具体的设计模式称为发布订阅模式。常见的流处理机制是 pull / push 模式。背压是一种常用策略,使得发布者拥有无限制的缓冲区存储元素,用于确保发布者发布元素太快时,不会去压制订阅者。

Reactive Streams是提供处理非阻塞背压异步流的一种标准,JDK9提供了 FlowSubmissionPublisher 两个主要的 API 来处理响应流。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
@Slf4j
public class ReactiveStreamTest {

@Test
public void testReactiveStream() throws InterruptedException {
SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();
Subscriber<Integer> subscriber = new ConsumerSubscriber<>();
publisher.subscribe(subscriber);
// submit是阻塞方法(当发布者发布数据时, 如果订阅者的数据缓冲数组已满, 则submit会被阻塞, 从而实现调节生产者发布数据的速度)
IntStream.rangeClosed(1, 1000).forEach(i -> {
log.debug("====== 生产数据 {} ======", i);
publisher.submit(i);
});
publisher.close();
Thread.currentThread().join(TimeUnit.SECONDS.toMillis(60));
}

private static class ConsumerSubscriber<T> implements Subscriber<T> {

private Subscription subscription;

@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
this.subscription.request(1);
}

@Override
public void onNext(T item) {
log.debug("====== 消费数据 {} ======", item);
try {
// 模拟业务处理耗时
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException ignored) {
}
this.subscription.request(1);
}

@Override
public void onError(Throwable throwable) {
log.error("出现异常啦", throwable);
this.subscription.cancel();
}

@Override
public void onComplete() {
log.info("数据全部处理完成");
}

}

}

WebMVC VS WebFlux

WebMVC是同步阻塞的IO模型,而WebFlux是异步非阻塞的IO模型。

Reactor = JDK8 Stream + JDK9 Reactive Stream

WebMVC VS WebFlux

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
@Slf4j
@RestController
public class ReactorResource {

@GetMapping("webmvc")
public String webmvc() {
long startTime = System.currentTimeMillis();
String currTime = this.handle();
long endTime = System.currentTimeMillis();
log.info("WebMVC >> Execute completed in {} ms", endTime - startTime);
return currTime;
}

@GetMapping("webflux")
public Mono<String> webflux() {
long startTime = System.currentTimeMillis();
Mono<String> mono = Mono.fromSupplier(this::handle); // 中间操作, 没有执行最终操作, 不会阻塞
long endTime = System.currentTimeMillis();
log.info("WebFlux >> Execute completed in {} ms", endTime - startTime);
return mono;
}

@GetMapping(value = "stream", produces = TEXT_EVENT_STREAM_VALUE)
public Flux<Integer> stream() {
long startTime = System.currentTimeMillis();
Flux<Integer> flux = Flux.fromStream(IntStream.rangeClosed(1, 10).mapToObj(i -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException ignored) {
}
return i;
}));
long endTime = System.currentTimeMillis();
log.info("Stream >> Execute completed in {} ms", endTime - startTime);
return flux;
}

private String handle() {
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException ignored) {
}
return LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
}

}