简介

Reactor 是 JVM 的完全非阻塞反应式编程基础,具有高效的需求管理(以管理 "背压" 的形式)。它直接与 Java 8 函数式 API 集成,特别是 CompletableFutureStreamDuration。它提供了可组合的异步序列 API( Flux 用于 [N] 个元素)和 Mono (用于 [0|1] 个元素)并广泛实现了响应式流规范

Reactor 还支持与 reactor-netty 项目的无阻塞进程间通信。Reactor Netty 适用于微服务架构,为 HTTP(包括 Websockets)、TCP 和 UDP 提供背压就绪网络引擎。完全支持反应式编码和解码(Reactor Core runs on Java 8 and above

如前所述,在核心中使用 Reactor 的最简单方法是使用 BOM 并将相关依赖项添加到项目中。请注意,添加此类依赖项时,必须省略版本,以便从 BOM 中选取版本。但是,如果要强制使用特定工件的版本,则可以像往常一样在添加依赖项时指定它。你还可以完全放弃 BOM,并按其工件版本指定依赖项。Maven 原生支持 BOM 概念。首先,您需要通过将以下代码片段添加到 BOM 中来导入 pom.xml BOM:

1
2
3
4
5
6
7
8
9
10
11
<dependencyManagement> 
<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-bom</artifactId>
<version>2023.0.1</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

接下来,像往常一样将依赖项添加到相关的 reactor 项目中,但不带 <version> ,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>

</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

Reactor 是响应式编程范式的一种实现,可以总结如下:

响应式编程是一种异步编程范式,涉及数据流和变化的传播。这意味着可以通过所采用的编程语言轻松表达静态(例如数组)或动态(例如事件发射器)数据流 — https://en.wikipedia.org/wiki/Reactive_programming

但是,为什么我们首先需要这样一个异步反应式库呢?主要有以下原因:

  • Blocking Can Be Wasteful

现代应用程序可以覆盖大量并发用户,尽管现代硬件的功能不断提高,但现代软件的性能仍然是一个关键问题。从广义上讲,有两种方法可以提高程序的性能:

  1. 并行化以使用更多线程和更多硬件资源
  2. 在当前资源的使用方式上寻求更高的效率

通常,Java 开发人员使用阻塞代码来编写程序。在出现性能瓶颈之前,这种做法是可以的。然后是时候引入其他线程,运行类似的阻塞代码了。但是,资源利用率的这种扩展会很快引入争用和并发问题。更糟糕的是,阻止会浪费资源。如果仔细观察,一旦程序涉及一些延迟(特别是 I/O,例如数据库请求或网络调用),资源就会被浪费,因为线程(可能是许多线程)现在处于空闲状态,等待数据。因此,并行化方法不是灵丹妙药。有必要访问硬件的全部功能,但推理也很复杂,并且容易浪费资源

  • Asynchronicity to the Rescue

前面提到的第二种方法,寻求更高的效率,可以解决资源浪费问题。通过编写异步、非阻塞代码,您可以让执行切换到使用相同基础资源的另一个活动任务,并在异步处理完成后返回到当前进程。但是,如何在 JVM 上生成异步代码呢?Java 提供了两种异步编程模型:

  1. Callbacks:异步方法没有返回值,但采用一个额外的 callback 参数(lambda 或匿名类),该参数在结果可用时被调用。一个众所周知的例子是 Swing EventListener 的层次结构
  2. Futures:步方法立即返回 Future<T> 异步进程计算一个 T 值,但 Future 对象包装对该值的访问。该值不会立即可用,在该值可用之前,可以轮询对象。例如,正在运行 Callable<T> 的任务 ExecutorService 使用 Future 对象

这些技术足够好吗?并非适用于每个用例,并且这两种方法都有局限性。回调很难组合在一起,很快就会导致代码难以阅读和维护(称为 "回调地狱")考虑一个示例:在 UI 上显示用户的前五个收藏夹,如果她没有收藏夹,则显示建议。这通过三个服务(一个提供收藏夹 ID,第二个获取收藏夹详细信息,第三个提供包含详细信息的建议),如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
userService.getFavorites(userId, new Callback<List<String>>() { 
public void onSuccess(List<String> list) {
if (list.isEmpty()) {
suggestionService.getSuggestions(new Callback<List<Favorite>>() {
public void onSuccess(List<Favorite> list) {
UiUtils.submitOnUiThread(() -> {
list.stream()
.limit(5)
.forEach(uiList::show);
});
}

public void onError(Throwable error) {
UiUtils.errorPopup(error);
}
});
} else {
list.stream()
.limit(5)
.forEach(favId -> favoriteService.getDetails(favId,
new Callback<Favorite>() {
public void onSuccess(Favorite details) {
UiUtils.submitOnUiThread(() -> uiList.show(details));
}

public void onError(Throwable error) {
UiUtils.errorPopup(error);
}
}
));
}
}

public void onError(Throwable error) {
UiUtils.errorPopup(error);
}
});

这个代码说实话已经有点回调地狱那味儿了,让一段不是很复杂的逻辑变得很难读了。但是如果用 Reactor 写呢?

1
2
3
4
5
6
userService.getFavorites(userId) 
.flatMap(favoriteService::getDetails)
.switchIfEmpty(suggestionService.getSuggestions())
.take(5)
.publishOn(UiUtils.uiThreadScheduler())
.subscribe(uiList::show, UiUtils::errorPopup);

这段代码就清晰多了,而且还能保持异步的特性,这就是 Reactor 的魅力所在!

背压

向上游传播信号也用于实现背压,我们在装配线类比中将其描述为当工作站比上游工作站处理速度慢时向上发送的反馈信号

Reactive Streams 规范定义的真正机制非常接近于类比:订阅者可以在无限模式下工作,让源以最快的速度推送所有数据,或者它可以使用该 request 机制向源发出信号,表明它已准备好处理大多数 n 元素

中间操作员还可以更改传输中的请求。想象一个 buffer 运算符,它以 10 个为一组对元素进行分组。如果订阅者请求一个缓冲区,则源可以生成十个元素。一些运算符还实施了预取策略,这些策略可以避免 request(1) 往返,如果在请求元素之前生成元素的成本不太高,则这是有益的

这样能够将 push 模式转换为 push-pull 混合的模式,如果下游准备好了,可以从上游拉取 n 个元素;但是如果上游元素还没有准备好,下游还是要等待上游的推送

Hot vs Cold

Rx 系列反应性文库区分了两大类反应性序列:热序列和冷序列。这种区别主要与反应式流对订阅者的反应方式有关:

  • 一个 "冷" 的序列,指对于每一个 Subscriber,都会收到从头开始所有的数据。如果源头 生成了一个 HTTP 请求,对于每一个订阅都会创建一个新的 HTTP 请求
  • 一个 "热" 的序列,指对于一个 Subscriber,只能获取从它开始订阅之后发出的数据。不过注意,有些 "热" 的响应式流可以缓存部分或全部历史数据。通常意义上来说,一个“热”的响应式流,甚至在即使没有订阅者接收数据的情况下,也可以 发出数据(这一点同 "Subscribe() 之前什么都不会发生" 的规则有冲突)

有关 Reactor 上下文中的热与冷的详细信息,请参阅 this reactor-specific section

Flux

对象 Flux 表示 0..N 项的反应序列。Flux<T> 是一个标准 Publisher<T>,表示 0 到 N 个发出项的异步序列,可以选择通过完成信号或错误终止。与 Reactive Streams 规范中一样,这三种类型的信号转换为对下游订阅者的 onNextonCompleteonError 方法的调用

具有如此大范围的可能信号,Flux 是通用的无功型。请注意,所有事件,甚至是终止事件,都是可选的:没有 onNext 事件,但 onComplete 事件表示一个空的有限序列,但删除 onComplete 你有一个无限的空序列(不是特别有用,除了围绕取消的测试)。同样,无限序列也不一定是空的。例如,Flux.interval(Duration) 生成一个 Flux<Long> 无限的,并从时钟发出有规律的滴答声

1
2
3
4
Flux<String> seq1 = Flux.just("foo", "bar", "foobar");

List<String> iterable = Arrays.asList("foo", "bar", "foobar");
Flux<String> seq2 = Flux.fromIterable(iterable);

Mono

Mono 对象表示单值或空(0..1)结果。Mono<T> 它通过 Publisher<T> 信号最多发射一个项目,然后以一个信号终止(成功 Mono,有或没有值),或只发出一个 onError onNext onComplete 信号(失败 Mono )

大多数 Mono 实现在调用后应立即调用 onComplete onNext 它们 Subscriber。Mono.never() 是一个异常值:它不发出任何信号,这在技术上并不被禁止,尽管在测试之外不是很有用。另一方面,明确禁止 onNext 和 onError 的组合

1
2
3
Mono<String> noData = Mono.empty(); 

Mono<String> data = Mono.just("foo");

subscribe

subscribe 操作符用来订阅流中的元素。当流中的元素没有被订阅的时候,所有的操作都不会触发,只有当流中的元素被订阅的时候,所有的操作才会触发。面的代码显示了不带参数的基本方法的示例:

1
2
Flux<Integer> ints = Flux.range(1, 3); 
ints.subscribe();

前面的代码不生成可见的输出,但它确实有效。生成 Flux 三个值。如果我们提供一个 lambda,我们可以使值可见。该 subscribe 方法的下一个示例显示了使值显示的一种方法:

1
2
Flux<Integer> ints = Flux.range(1, 3); 
ints.subscribe(i -> System.out.println(i));
subscribe 还可以定义错误消费者接口,用来消费流中的错误:
1
2
3
4
5
6
7
Flux<Integer> ints = Flux.range(1, 4) 
.map(i -> {
if (i <= 3) return i;
throw new RuntimeException("Got to 4");
});
ints.subscribe(i -> System.out.println(i),
error -> System.err.println("Error: " + error));

subscribe 方法还可以同时包括错误处理程序和完成事件的处理程序,如以下示例所示:

1
2
3
4
Flux<Integer> ints = Flux.range(1, 4); 
ints.subscribe(i -> System.out.println(i),
error -> System.err.println("Error " + error),
() -> System.out.println("Done"));

所有这些基于 lambda 的 subscribe() 变体都具有 Disposable 返回类型。在这种情况下,接口 Disposable 表示可以通过调用其 dispose() 方法取消订阅

对于 Flux 或 Mono ,取消是源应停止生成元素的信号。但是,不能保证它是即时的:某些源可能会生成元素的速度很快,以至于它们甚至可以在收到取消指令之前完成

BaseSubscriber

还有一种更通用的附加 subscribe 方法,它采用成熟的方法 Subscriber,而不是从 lambda 中组合出一个。为了帮助编写这样的类 Subscriber,我们提供了一个可扩展的类,称为 BaseSubscriber

BaseSubscriber(或它的子类)是一次性的,这意味着如果一个 BaseSubscriber 订阅了第二个 Publisher 实例,则取消其对第一个 Publisher 实例的订阅。这是因为使用一个实例两次会违反 Reactive Streams 规则,即 onNext Subscriber 的方法不能并行调用。因此,仅当匿名实现直接在对 的 Publisher#subscribe(Subscriber) 调用中声明时才适用

1
2
3
SampleSubscriber<Integer> ss = new SampleSubscriber<Integer>();
Flux<Integer> ints = Flux.range(1, 4);
ints.subscribe(ss);

以下示例显示了 SampleSubscriber BaseSubscriber:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package io.projectreactor.samples;
import org.reactivestreams.Subscription;
import reactor.core.publisher.BaseSubscriber;

public class SampleSubscriber<T> extends BaseSubscriber<T> {

@Override
public void hookOnSubscribe(Subscription subscription) {
System.out.println("Subscribed");
request(1);
}

@Override
public void hookOnNext(T value) {
System.out.println(value);
request(1);
}
}

BaseSubscriber 还提供了一种切换到无界模式 requestUnbounded() 的方法(等效于 request(Long.MAX_VALUE)),以及一种方法 cancel()。它还具有其他钩子:hookOnCompletehookOnError hookOnCancel 和 hookFinally(在序列终止时始终调用,终止类型作为 SignalType 参数传入)

自定义原始请求的最简单方法是 subscribe 使用 BaseSubscriber with the hookOnSubscribe method overrid,如以下示例所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Flux.range(1, 10)
.doOnRequest(r -> System.out.println("request of " + r))
.subscribe(new BaseSubscriber<Integer>() {

@Override
public void hookOnSubscribe(Subscription subscription) {
request(1);
}

@Override
public void hookOnNext(Integer integer) {
System.out.println("Cancelling after having received " + integer);
cancel();
}
});

limitRate / limitRequest

在 Reactor 3.x 中,你可以使用 limitRate 或 limitRequest 操作符来控制流的速率。limitRate 操作符是基于每秒发出的信号总数来限制流的速率。例如,limitRate(10) 表示每秒只允许通过 10 个信号。limitRequest 操作符是基于总共发出的信号数来限制流的速率。例如,limitRequest(100) 表示只允许通过 100 个信号。这两个操作符可以用于控制流的速率,以避免生产者过快地向消费者发送数据。可以根据实际需求选择其中之一来限制流的速率

1
2
3
Flux.range(1, 10)
.limitRate(2)
.subscribe(System.out::println);
1
2
3
Flux.range(1, 10)
.limitRequest(3)
.subscribe(System.out::println);

这些运算符通常还会实施补货优化:一旦运算符看到 75% 的预取请求得到满足,它就会从上游重新请求 75%。这是一种启发式优化,以便这些操作员主动预测即将到来的请求

generate

最简单的创建 Flux 的方式就是使用 generate 方法。这是一种 同步地、逐个地 产生值的方法,意味着 sink 是一个 SynchronousSink 而且其 next() 方法在每次回调的时候最多只能被调用一次。你也可以调用 error(Throwable) 或者 complete(),不过是可选的。

最有用的一种方式就是同时能够记录一个状态值(state),从而在使用 sink 发出下一个元素的时候能够 基于这个状态值去产生元素。此时生成器(generator)方法就是一个 BiFunction<S, SynchronousSink<T>, S>, 其中 <S> 是状态对象的类型。你需要提供一个 Supplier<S> 来初始化状态值,而生成器需要 在每一 "回合" 生成元素后返回新的状态值(供下一回合使用)

例如我们使用一个 int 作为状态值。基于状态值的 generate 示例

1
2
3
4
5
6
7
Flux<String> flux = Flux.generate(
() -> 0,
(state, sink) -> {
sink.next("3 x " + state + " = " + 3*state);
if (state == 10) sink.complete();
return state + 1;
});

您还可以使用可变的 <S> 例如,上面的例子可以使用单个 AtomicLong 作为状态来重写,并在每一轮中对其进行更改:

1
2
3
4
5
6
7
8
Flux<String> flux = Flux.generate(
AtomicLong::new,
(state, sink) -> {
long i = state.getAndIncrement();
sink.next("3 x " + i + " = " + 3*i);
if (i == 10) sink.complete();
return state;
});

以下示例使用 generate 包含:Consumer

1
2
3
4
5
6
7
8
Flux<String> flux = Flux.generate(
AtomicLong::new,
(state, sink) -> {
long i = state.getAndIncrement();
sink.next("3 x " + i + " = " + 3 * i);
if (i == 10) sink.complete();
return state;
}, (state) -> System.out.println("state: " + state));

如果 state 使用了数据库连接或者其他需要最终进行清理的资源,这个 Consumer lambda 可以用来在最后关闭连接或完成相关的其他清理任务

create

作为一个更高级的创建 Flux 的方式,create 方法的生成方式既可以是同步,也可以是异步的,并且还可以每次发出多个元素。该方法用到了 FluxSink,后者同样提供 nexterrorcomplete 等方法。与 generate 不同的是,create 不需要状态值,另一方面,它可以在回调中触发多个事件(即使是在未来的某个时间)

create 有个好处就是可以将现有的 API 转为响应式,比如监听器的异步方法

假设你有一个监听器 API,它按 chunk 处理数据,有两种事件:(1)一个 chunk 数据准备好的事件;(2)处理结束的事件。如下:

1
2
3
4
interface MyEventListener<T> {
void onDataChunk(List<T> chunk);
void processComplete();
}

你可以使用 create 方法将其转化为响应式类型 Flux<T>

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Flux<String> bridge = Flux.create(sink -> {
myEventProcessor.register(
new MyEventListener<String>() {

public void onDataChunk(List<String> chunk) {
for(String s : chunk) {
sink.next(s);
}
}

public void processComplete() {
sink.complete();
}
});
});

此外,既然 create 可以是异步地,并且能够控制背压,你可以通过提供一个 OverflowStrategy 来定义背压行为

  • IGNORE: 完全忽略下游背压请求,这可能会在下游队列积满的时候导致 IllegalStateException
  • ERROR: 当下游跟不上节奏的时候发出一个 IllegalStateException 的错误信号
  • DROP:当下游没有准备好接收新的元素的时候抛弃这个元素
  • LATEST:让下游只得到上游最新的元素
  • BUFFER:(默认的)缓存所有下游没有来得及处理的元素(这个不限大小的缓存可能导致 OutOfMemoryError

Mono 也有一个用于 create 的生成器(generator)—— MonoSink,它不能生成多个元素, 因此会抛弃第一个元素之后的所有元素

push

create 的一个变体是 push,适合生成事件流。与 create 类似,push 也可以是异步地,并且能够使用以上各种溢出策略(overflow strategies)管理背压。每次只有一个生成线程可以调用 nextnexterror

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Flux<String> bridge = Flux.push(sink -> {
myEventProcessor.register(
new SingleThreadEventListener<String>() {

public void onDataChunk(List<String> chunk) {
for(String s : chunk) {
sink.next(s);
}
}

public void processComplete() {
sink.complete();
}

public void processError(Throwable e) {
sink.error(e);
}
});
});

不像 push,create 可以用于 push 或 pull 模式,因此适合桥接监听器的的 API,因为事件消息会随时异步地到来。回调方法 onRequest 可以被注册到 FluxSink 以便跟踪请求。这个回调可以被用于从源头请求更多数据,或者通过在下游请求到来的时候传递数据给 sink 以实现背压管理。这是一种推送 / 拉取混合的模式,因为下游可以从上游拉取已经就绪的数据,上游也可以在数据就绪的时候将其推送到下游

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Flux<String> bridge = Flux.create(sink -> {
myMessageProcessor.register(
new MyMessageListener<String>() {

public void onMessage(List<String> messages) {
for(String s : messages) {
sink.next(s);
}
}
});
sink.onRequest(n -> {
List<String> messages = myMessageProcessor.request(n);
for(String s : message) {
sink.next(s);
}
});

onDispose 和 onCancel 这两个回调用于在被取消和终止后进行清理工作。onDispose 可用于在 Flux 完成,有错误出现或被取消的时候执行清理。onCancel 只用于针对 "取消" 信号执行相关操作,会先于 onDispose 执行

1
2
3
4
5
Flux<String> bridge = Flux.create(sink -> {
sink.onRequest(n -> channel.poll(n))
.onCancel(() -> channel.cancel())
.onDispose(() -> channel.close())
});

Handle

handle 方法有些不同,它在 Mono 和 Flux 中都有。然而,它是一个实例方法 (instance method),意思就是它要链接在一个现有的源后使用(与其他操作符一样)。它与 generate 比较类似,因为它也使用 SynchronousSink,并且只允许元素逐个发出。然而 handle 可被用于基于现有数据源中的元素生成任意值,有可能还会跳过一些元素。 这样,可以把它当做 map 与 filter 的组合。handle 方法签名如下:

1
handle(BiConsumer<T, SynchronousSink<R>>)

举个例子,响应式流规范允许 null 这样的值出现在序列中。假如你想执行一个类似 map 的操作,你想利用一个现有的具有映射功能的方法,但是它会返回 null,这时候怎么办呢?例如,下边的方法可以用于 Integer 序列,映射为字母或 null:

1
2
3
4
5
6
7
public String alphabet(int letterNumber) {
if (letterNumber < 1 || letterNumber > 26) {
return null;
}
int letterIndexAscii = 'A' + letterNumber - 1;
return "" + (char) letterIndexAscii;
}

我们可以使用 handle 来去掉其中的 null。将 handle 用于一个 "映射 + 过滤 null" 的场景

1
2
3
4
5
6
7
8
Flux<String> alphabet = Flux.just(-1, 30, 13, 9, 20)
.handle((i, sink) -> {
String letter = alphabet(i);
if (letter != null)
sink.next(letter);
});

alphabet.subscribe(System.out::println);

Schedulers

Reactor, 就像 RxJava,也可以被认为是 并发无关(concurrency agnostic)的。意思就是,它并不强制要求任何并发模型。更进一步,它将选择权交给开发者。不过,它还是提供了一些方便 进行并发执行的库

在 Reactor 中,执行模式以及执行过程取决于所使用的 Scheduler。Scheduler 是一个拥有广泛实现类的抽象接口。Schedulers 类提供的静态方法用于达成如下的执行环境:

  • 当前线程(Schedulers.immediate()
  • 可重用的单线程(Schedulers.single())。注意,这个方法对所有调用者都提供同一个线程来使用,直到该调度器(Scheduler)被废弃。如果你想使用专一的线程,就对每一个调用使用 Schedulers.newSingle()
  • 弹性线程池(Schedulers.elastic())。它根据需要创建一个线程池,重用空闲线程。线程池如果空闲时间过长(默认为 60s)就会被废弃。对于 I/O 阻塞的场景比较适用。Schedulers.elastic() 能够方便地给一个阻塞 的任务分配它自己的线程,从而不会妨碍其他任务和资源
  • 固定大小线程池(Schedulers.parallel())。所创建线程池的大小与 CPU 个数等同

此外,你还可以使用 Schedulers.fromExecutorService(ExecutorService) 基于现有的 ExecutorService 创建 Scheduler(虽然不太建议,不过你也可以使用 Executor 来创建)。你也可以使用 newXXX 方法来创建不同的调度器。比如 Schedulers.newElastic(yourScheduleName) 创建一个新的名为 yourScheduleName 的弹性调度器

一些操作符默认会使用一个指定的调度器(通常也允许开发者调整为其他调度器)例如,通过工厂方法 Flux.interval(Duration.ofMillis(300)) 生成的每 300ms 打点一次的 Flux<Long>, 默认情况下使用的是 Schedulers.parallel(),下边的代码演示了如何将其装换为 Schedulers.single()

1
Flux.interval(Duration.ofMillis(300), Schedulers.newSingle("test"))

Reactor 提供了两种在响应式链中调整调度器 Scheduler 的方法:publishOnsubscribeOn。它们都接收一个 Scheduler 作为参数,从而可以改变调度器。但是 publishOn 在链中出现的位置 是有讲究的,而 subscribeOn 则无所谓。要理解它们的不同,你首先要理解 nothing happens until you subscribe()

publishOn

publishOn 与任何其他运营商一样,在订阅者链的中间应用。它从上游获取信号并在下游重放信号,同时对来自关联 Scheduler。因此,它会影响后续运算符的执行位置(直到另一个 publishOn 运算符被链接进来),如下所示:

  • 将执行上下文 Thread 更改为由 Scheduler
  • 根据规范,调用是按顺序进行的,onNext 因此这会占用单个线程
  • 除非它们在特定的 Scheduler,运算符上工作,否则在同一线程上 publishOn 继续执行

下面的示例使用该 publishOn 方法:

1
2
3
4
5
6
7
8
9
Scheduler s = Schedulers.newParallel("parallel-scheduler", 4); 

final Flux<String> flux = Flux
.range(1, 2)
.map(i -> 10 + i)
.publishOn(s)
.map(i -> "value " + i);

new Thread(() -> flux.subscribe(System.out::println));

subscribeOn

subscribeOn 适用于构建后向链时的订阅过程。通常建议将其放在数据源之后,因为中间运算符可能会影响执行的上下文。但是,这不会影响后续调用的行为 publishOn 它们仍然会切换其后面的链部分的执行上下文。下面的示例使用该 subscribeOn 方法:

1
2
3
4
5
6
7
8
9
Scheduler s = Schedulers.newParallel("parallel-scheduler", 4); 

final Flux<String> flux = Flux
.range(1, 2)
.map(i -> 10 + i)
.subscribeOn(s)
.map(i -> "value " + i);

new Thread(() -> flux.subscribe(System.out::println));

错误处理

在响应式流中,错误(error)是终止(terminal)事件。当有错误发生时,它会导致流序列停止,并且错误信号会沿着操作链条向下传递,直至遇到你定义的 Subscriber 及其 onError 方法

这样的错误还是应该在应用层面解决的。比如,你可能会将错误信息显示在用户界面,或者通过某个 REST 端点(endpoint)发出。因此,订阅者(subscriber)的 onError 方法是应该定义的

如果没有定义,onError 会抛出 UnsupportedOperationException。你可以接下来再检测错误,并通过 Exceptions.isErrorCallbackNotImplemented 方法捕获和处理它

Reactor 还提供了其他的用于在链中处理错误的方法,即错误处理操作(error-handling operators)

1
2
3
Flux.just(1, 2, 0)
.map(i -> "100 / " + i + " = " + (100 / i)) //this triggers an error with 0
.onErrorReturn("Divided by zero :("); // error handling example

在你了解错误处理操作符之前,你必须牢记响应式流中的任何错误都是一个终止事件。即使用了错误处理操作符,也不会让源头流序列继续。而是将 onError 信号转化为一个新的序列的开始。换句话说,它代替了被终结的上游流序列

onErrorReturn

捕获并返回静态默认值。以下示例演示如何使用它:

1
2
3
4
5
6
7
8
9
10
11
Flux.just(10)
.map(this::doSomethingDangerous)
.onErrorReturn("RECOVERED");

// 等价于
try {
return doSomethingDangerous(10);
}
catch (Throwable error) {
return "RECOVERED";
}

你还可以通过判断错误信息的内容,来筛选哪些要给出缺省值,哪些仍然让错误继续传递下去:

1
2
3
Flux.just(10)
.map(this::doSomethingDangerous)
.onErrorReturn(e -> e.getMessage().equals("boom10"), "recovered10");

onErrorComplete

如果你甚至不想将异常替换为回退值,而是忽略它并仅传播到目前为止已生成的元素,那么你想要的实质上是将信号替换为 onComplete 信号

1
2
3
Flux.just(10,20,30)
.map(this::doSomethingDangerousOn30)
.onErrorComplete();

onErrorResume

如果你不只是想要在发生错误的时候给出缺省值,而是希望提供一种更安全的处理数据的方式, 可以使用 onErrorResume。假设,你会尝试从一个外部的不稳定服务获取数据,但仍然会在本地缓存一份可能有些过期的数据, 因为缓存的读取更加可靠。可以这样来做:

1
2
3
Flux.just("key1", "key2")
.flatMap(k -> callExternalService(k))
.onErrorResume(e -> getFromCache(k));

就像 onErrorReturnonErrorResume 也有可以用于预先过滤错误内容的方法变体,可以基于异常类或 Predicate 进行过滤。它实际上是用一个 Function 来作为参数,还可以返回一个新的流序列

1
2
3
4
5
6
7
8
9
10
Flux.just("timeout1", "unknown", "key2")
.flatMap(k -> callExternalService(k))
.onErrorResume(error -> {
if (error instanceof TimeoutException)
return getFromCache(k);
else if (error instanceof UnknownKeyException)
return registerNewEntry(k, "DEFAULT");
else
return Flux.error(error);
});

有时候并不想提供一个错误处理方法,而是想在接收到错误的时候计算一个候补的值(捕获并动态计算一个候补值)。例如,如果你的返回类型本身就有可能包装有异常(比如 Future.complete(T success) vs Future.completeExceptionally(Throwable error)),你有可能使用流中的错误包装起来实例化 返回值。这也可以使用上一种错误处理方法的方式(使用 onErrorResume)解决,代码如下:

1
2
3
erroringFlux.onErrorResume(error -> Mono.just( 
myWrapper.fromError(error)
));

onErrorMap

1
2
3
Flux.just("timeout1")
.flatMap(k -> callExternalService(k))
.onErrorMap(original -> new BusinessException("oops, SLA exceeded", original));

doOnError

如果对于错误你只是想在不改变它的情况下做出响应(如记录日志),并让错误继续传递下去, 那么可以用 doOnError 方法。如下边的例子所示,我们会记录错误日志,并且还通过变量自增统计错误发生个数:

1
2
3
4
5
6
7
8
9
LongAdder failureStat = new LongAdder();
Flux<String> flux =
Flux.just("unknown")
.flatMap(k -> callExternalService(k))
.doOnError(e -> {
failureStat.increment();
log("uh oh, falling back, service failed for key " + k);
})
.onErrorResume(e -> getFromCache(k));

using / doFinally

最后一个要与命令式编程对应的对比就是使用 Java 7 try-with-resources 或 finally 代码块清理资源。在 Reactor 中都有对应的方法:usingdoFinally

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
AtomicBoolean isDisposed = new AtomicBoolean();
Disposable disposableInstance = new Disposable() {
@Override
public void dispose() {
isDisposed.set(true);
}

@Override
public String toString() {
return "DISPOSABLE";
}
};

Flux<String> flux =
Flux.using(
() -> disposableInstance,
disposable -> Flux.just(disposable.toString()),
Disposable::dispose
);

另一方面,doFinally 在序列终止(无论是 onCompleteonError 还是取消)的时候被执行,并且能够判断是什么类型的终止事件(完成、错误还是取消)

1
2
3
4
5
6
7
8
9
LongAdder statsCancel = new LongAdder(); 

Flux<String> flux =
Flux.just("foo", "bar")
.doFinally(type -> {
if (type == SignalType.CANCEL)
statsCancel.increment();
})
.take(1);

onError

演示终止方法 onError。为了演示当错误出现的时候如何导致上游序列终止,我们使用 Flux.interval 构造一个更加直观的例子。这个 interval 操作符会在每 x 单位的时间发出一个自增的 Long 值

1
2
3
4
5
6
7
8
9
10
Flux<String> flux =
Flux.interval(Duration.ofMillis(250))
.map(input -> {
if (input < 3) return "tick " + input;
throw new RuntimeException("boom");
})
.onErrorReturn("Uh oh");

flux.subscribe(System.out::println);
Thread.sleep(2100);

即使多给了 1 秒钟时间,也没有更多的 tick 信号由 interval 产生了,所以序列确实被错误信号终止了

retry

还有一个用于错误处理的操作符你可能会用到,就是 retry,见文知意,用它可以对出现错误的序列进行重试。问题是它对于上游 Flux 是基于重订阅(re-subscribing)的方式。这实际上已经是一个不同的序列了,发出错误信号的序列仍然是终止了的。为了验证这一点,我们可以在继续用上边的例子,增加一个 retry(1) 代替 onErrorReturn 来重试一次

1
2
3
4
5
6
7
8
9
10
Flux.interval(Duration.ofMillis(250))
.map(input -> {
if (input < 3) return "tick " + input;
throw new RuntimeException("boom");
})
.elapsed()
.retry(1)
.subscribe(System.out::println, System.err::println);

Thread.sleep(2100);

可见,retry(1) 不过是再一次从新订阅了原始的 interval,从 tick 0 开始。第二次,由于异常再次出现,便将异常传递到下游了

还有一个 "高配版" 的 retry(retryWhen),它使用一个伴随(companion) Flux 来判断对某次错误是否要重试。这个伴随 Flux 是由操作符创建的,但是由开发者包装它,从而实现对重试操作的配置

这个伴随 Flux 是一个 Flux<Throwable>,它作为 retryWhen 的唯一参数被传递给一个 Function,你可以定义这个 Function 并让它返回一个新的 Publisher<?>。重试的循环 会这样运行:

  • 每次出现错误,错误信号会发送给伴随 Flux,后者已经被你用 Function 包装
  • 如果伴随 Flux 发出元素,就会触发重试
  • 如果伴随 Flux 完成(complete),重试循环也会停止,并且原始序列也会 完成(complete)
  • 如果伴随 Flux 产生一个错误,重试循环停止,原始序列也停止 或 完成,并且这个错误会导致 原始序列失败并终止

了解前两个场景的区别是很重要的。如果让伴随 Flux 完成(complete)等于吞掉了错误。如下代码用 retryWhen 模仿了 retry(3) 的效果:

1
2
3
4
Flux<String> flux = Flux
.<String>error(new IllegalArgumentException())
.doOnError(System.out::println)
.retryWhen(companion -> companion.take(3));

事实上,上边例子最终得到的是一个空的 Flux,但是却成功完成了。反观对同一个 Flux 调用 retry(3) 的话,最终是以最后一个 error 终止 Flux,故而 retryWhen 与之不同。实现同样的效果需要一些额外的技巧:

1
2
3
4
5
6
7
8
9
Flux<String> flux =
Flux.<String>error(new IllegalArgumentException())
.retryWhen(companion -> companion
.zipWith(Flux.range(1, 4),
(error, index) -> {
if (index < 4) return index;
else throw Exceptions.propagate(error);
})
);

Sinks

在 Reactor 中,接收器是一个允许以独立方式安全手动触发信号的类,创建一个 Publisher 能够处理多个 Subscriber 的类似结构

在此之前 3.5.0 ,还有一组 Processor 实现已经逐步淘汰

  • Sink.one():该 sink 用于接收单个元素,并立即完成
1
2
3
4
5
Mono<String> mono = Mono.just("Hello");
Sink<String, Mono<String>> sink = Sink.one();
mono.subscribe(sink);
Mono<String> result = sink.asMono();
// result 中包含 "Hello"
  • Sink.many():用于处理多个元素的场景,它可以先通过调用 SinkBuffer.create() 创建一个 SinkBuffer 对象,然后将 SinkBuffer 转换为 Flux 流输出
1
2
3
4
5
6
7
8
9
10
Long[] elements = { 1L, 2L, 3L, 4L };
Sink<Long, Flux<Long>> sink = Sink.many().multicast().directBestEffort();
SinkBuffer<Long> buffer = sink.asBuffer();
for (Long element : elements) {
buffer.tryEmitNext(element);
}
buffer.tryEmitComplete();

Flux<Long> result = sink.asFlux();
result.subscribe(System.out::println);
  • Sink.empty():该 sink 用于立即完成,而不接收任何输入元素
1
2
3
Sink<Object, Mono<Object>> sink = Sink.empty();
Mono<Object> result = sink.asMono();
// result 完成,并没有任何值
  • Sink.ignoreElements():该 sink 用于消费并丢弃所有输入元素,而不进行任何输出
1
2
3
4
5
Flux<Integer> flux = Flux.range(1, 5);
Sink<Integer, Mono<Void>> sink = Sink.ignoreElements();
flux.subscribe(sink);
Mono<Void> result = sink.asMono();
// result 完成,没有任何值
  • Sink.collectList(): 该 sink 用于将元素累积到一个列表中,并作为单个项目发出
1
2
3
4
5
Flux<Integer> flux = Flux.range(1, 5);
Sink<Integer, Mono<List<Integer>>> sink = Sink.collectList();
flux.subscribe(sink);
Mono<List<Integer>> result = sink.asMono();
// result 中包含 [1, 2, 3, 4, 5]
  • Sink.collectMap(): 该 sink 用于将元素累积到一个 Map 中,并作为单个项目发出
1
2
3
4
5
Flux<String> flux = Flux.just("apple", "banana", "cherry");
Sink<String, Mono<Map<String, Integer>>> sink = Sink.collectMap(item -> item, String::length);
flux.subscribe(sink);
Mono<Map<String, Integer>> result = sink.asMono();
// result 中包含 {"apple"=5, "banana"=6, "cherry"=6}

log

除了基于 stack trace 的调试和分析,还有一个有效的工具可以跟踪异步序列并记录日志。就是 log() 操作符。将其加到操作链上之后,它会读(只读,peek)每一个 在其上游的 Flux 或 Mono 事件(包括 onNextonErroronComplete,以及 订阅、取消和请求)

关于 logging 的具体实现

log 操作符通过 SLF4J 使用类似 Log4J 和 Logback 这样的公共的日志工具, 如果 SLF4J 不存在的话,则直接将日志输出到控制台。控制台使用 System.err 记录 WARN 和 ERROR 级别的日志,使用 System.out 记录其他级别的日志。如果你喜欢使用 JDK java.util.logging,在 3.0.x 你可以设置 JDK 的系统属性 reactor.logging.fallback

例如,假设我们已经激活并配置了 Logback,并且有一个像 range(1,10).take(3) 通过在之前放置一个 log(),我们可以深入了解它是如何工作的,以及它向上游传播到范围的事件类型 take,如以下示例所示:

1
2
3
4
Flux<Integer> flux = Flux.range(1, 10)
.log()
.take(3);
flux.subscribe();

这将打印出以下内容(通过记录器的控制台 appender):

1
2
3
4
5
6
10:45:20.200 [main] INFO  reactor.Flux.Range.1 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
10:45:20.205 [main] INFO reactor.Flux.Range.1 - | request(3)
10:45:20.205 [main] INFO reactor.Flux.Range.1 - | onNext(1)
10:45:20.205 [main] INFO reactor.Flux.Range.1 - | onNext(2)
10:45:20.205 [main] INFO reactor.Flux.Range.1 - | onNext(3)
10:45:20.205 [main] INFO reactor.Flux.Range.1 - | cancel()

transform

从代码整洁的角度来说,重用代码是一个好办法。Reactor 提供了几种帮你打包重用代码的方式,主要通过使用操作符或者常用的 "操作符组合" 的方法来实现。如果你觉得一段操作链很常用,你可以将这段操作链打包封装后备用

transform 操作符可以将一段操作链封装为一个函数式(function)。这个函数式能在操作期(assembly time)将被封装的操作链中的操作符还原并接入到调用 transform 的位置。这样做和直接将被封装的操作符加入到链上的效果是一样的。示例如下:

1
2
3
4
5
6
7
8
Function<Flux<String>, Flux<String>> filterAndMap =
f -> f.filter(color -> !color.equals("orange"))
.map(String::toUpperCase);

Flux.fromIterable(Arrays.asList("blue", "green", "orange", "purple"))
.doOnNext(System.out::println)
.transform(filterAndMap)
.subscribe(d -> System.out.println("Subscriber to Transformed MapAndFilter: "+d));

上边例子的输出如下:

1
2
3
4
5
6
7
blue
Subscriber to Transformed MapAndFilter: BLUE
green
Subscriber to Transformed MapAndFilter: GREEN
orange
purple
Subscriber to Transformed MapAndFilter: PURPLE