Reactor 拾遗

publishOn与subscribeOn

Reactor提供了两种在响应式链中调整调度器Scheduler的方法:publishOn和subscribeOn。它们都接受一个Scheduler作为参数,从而可以改变调度器。

  • publishOn的用法和处于订阅链(subscriber-chain)中的其他操作符一样。它将上游信号传给下游,同时执行指定的调度器Scheduler的某个工作线程上的回调。它会改变后续操作符的执行线程(直到下一个publishOn出现在这个链上)。
  • subscribeOn用于订阅(subscription)过程,作用于向上的订阅链(发布者在被订阅时才激活,订阅的传递方向是向上游的)。所以,无论你把subscribeOn至于操作链的什么位置,它都会影响到源头的线程执行环境(context)。但是,它不会影响后续的publishOn,后者仍能够切换其后操作符的线程执行环境。

Flux和Mono不会创建线程。但类似publishOn的一些操作符会创建线程。同时,作为一种任务共享形式,这些操作符可能会从其他任务池(workpool)——如果其他任务池是空闲的话——那里“偷”线程。因此,无论是Flux、Mono还是Subscriber都应该精于线程处理。它们依赖这些操作符来管理线程和任务池。publishOn强制下一个操作符(很可能包括下一个的下一个…​)来运行在一个不同的线程上。类似的,subscribeOn强制上一个操作符(很可能包括上一个的上一个…​)来运行在一个不同的线程上。记住,在你订阅(subscribe)前,你只是定义了处理流程,而没有启动发布者。基于此,Reactor可以使用这些规则来决定如何执行操作链。然后,一旦你订阅了,整个流程就开始工作了。

Scheduler.parallel()创建一个基于单线程ExecutorService的固定大小的任务线程池。因为可能会有一个或两个线程导致问题,它总是至少创建4个线程。然后publishOn方法便共享了这些任务线程,当publishOn请求元素的时候,会从任一个正在发出元素的线程那里获取元素。这样,就是进行了任务共享(一种资源共享方式)。Reactor还提供了好几种共享资源的方式,请参考Schedulers。
Scheduler.elastic()也能创建线程,它能够很方便地创建专门的线程(以便运行可能会阻塞资源的任务,比如一个同步服务)。

publishOn()可以用来切换执行线程。publishOn能够影响到其之后的操作符的执行线程,直到有新的publishOn出现。因此publishOn的位置很重要。如下例,

1
2
3
4
5
6
EmitterProcessor<Integer> processor = EmitterProcessor.create();
processor.publishOn(scheduler1)
.map(i->transform(i))
.publishOn(scheduler2)
.doOnNext(i->processNext(i))
.subscribe();

map()中的transform方法是在scheduler1的一个工作线程上执行的,而doOnNext()中的processNext方法是在scheduler2的一个工作线程上执行的。单线程的调度器可能用于不同阶段的任务或不同的订阅者,用于确保线程关联性。

Sinks

主要使用ChatGPT3.5翻译

综述

Sinks are constructs through which Reactive Streams signals can be programmatically pushed, with Flux or Mono semantics. These standalone sinks expose tryEmit methods that return an Sinks.EmitResult enum, allowing to atomically fail in case the attempted signal is inconsistent with the spec and/or the state of the sink.
This class exposes a collection of (Sinks.Many builders and Sinks.One factories. Unless constructed through the unsafe() spec, these sinks are thread safe in the sense that they will detect concurrent access and fail fast on one of the attempts. unsafe() sinks on the other hand are expected to be externally synchronized (typically by being called from within a Reactive Streams-compliant context, like a Subscriber or an operator, which means it is ok to remove the overhead of detecting concurrent access from the sink itself).

Sink 是一种通过 Flux 或 Mono 语义以编程方式推送 Reactive Streams 信号的构造物。这些独立的 Sink 暴露了 tryEmit 方法,该方法返回一个 Sinks.EmitResult 枚举,允许在尝试的信号与规范和/或 Sink 的状态不一致时原子失败。
该类公开了一组 Sinks.Many 构建器和 Sinks.One 工厂。除非通过 unsafe() 规范构建,否则这些 Sink 在线程安全上是可靠的,它们将检测并发访问,并在尝试中的一个失败时快速失败。另一方面,unsafe() Sink 预期进行外部同步(通常通过从 Reactive Streams 兼容的上下文内部调用,比如 Subscriber 或运算符,这意味着可以从 Sink 本身移除检测并发访问的开销)。

ManySpec many()

A base interface for standalone Sinks with Flux semantics.
The sink can be exposed to consuming code as a Flux via its asFlux() view.

一个独立的 Sink 接口,具有 Flux 语义。
这个 Sink 可以通过其 asFlux() 视图暴露给消费代码,表现为一个 Flux。

void emitNext(T t, EmitFailureHandler failureHandler)

A simplified attempt at emitting a non-null element via the tryEmitNext(Object) API, generating an onNext signal. If the result of the attempt is not a success, implementations SHOULD retry the tryEmitNext(Object) call IF the provided Sinks.EmitFailureHandler returns true. Otherwise, failures are dealt with in a predefined way that might depend on the actual sink implementation (see below for the vanilla reactor-core behavior).
Generally, tryEmitNext(Object) is preferable since it allows a custom handling of error cases, although this implies checking the returned Sinks.EmitResult and correctly acting on it. This API is intended as a good default for convenience.
When the Sinks.EmitResult is not a success, vanilla reactor-core operators have the following behavior:

  • Sinks.EmitResult.FAIL_ZERO_SUBSCRIBER: no particular handling. should ideally discard the value but at that point there’s no Subscriber from which to get a contextual discard handler.
  • Sinks.EmitResult.FAIL_OVERFLOW: discard the value (Operators.onDiscard(Object, Context)) then call emitError(Throwable, Sinks.EmitFailureHandler) with a Exceptions.failWithOverflow(String) exception.
  • Sinks.EmitResult.FAIL_CANCELLED: discard the value (Operators.onDiscard(Object, Context)).
  • Sinks.EmitResult.FAIL_TERMINATED: drop the value (Operators.onNextDropped(Object, Context)).
  • Sinks.EmitResult.FAIL_NON_SERIALIZED: throw an Sinks.EmissionException mentioning RS spec rule 1.3. Note that unsafe() never trigger this result. It would be possible for an Sinks.EmitFailureHandler to busy-loop and optimistically wait for the contention to disappear to avoid this case for safe sinks…

Might throw an unchecked exception as a last resort (eg. in case of a fatal error downstream which cannot be propagated to any asynchronous handler, a bubbling exception, a Sinks.EmitResult.FAIL_NON_SERIALIZED as described above, …).

以下是一个简化的尝试通过 tryEmitNext(Object) API 发出非空元素的示例,生成一个 onNext 信号。如果尝试的结果不成功,实现应根据提供的 Sinks.EmitFailureHandler 返回 true 来重试 tryEmitNext(Object) 调用。否则,失败将以预定义的方式处理,这可能取决于实际的 sink 实现(请参阅下面的 vanilla reactor-core 行为)。
通常情况下,tryEmitNext(Object) 是首选的,因为它允许对错误情况进行自定义处理,尽管这意味着需要检查返回的 Sinks.EmitResult 并正确地对其进行操作。这个 API 旨在作为方便的默认值。
当 Sinks.EmitResult 不成功时,vanilla reactor-core 操作符有以下行为:

  • Sinks.EmitResult.FAIL_ZERO_SUBSCRIBER:没有特殊处理。理想情况下应该丢弃该值,但在这个时候没有订阅者可以从中获取上下文丢弃处理程序。
  • Sinks.EmitResult.FAIL_OVERFLOW:丢弃值(Operators.onDiscard(Object, Context)),然后使用 Exceptions.failWithOverflow(String) 异常调用 emitError(Throwable, Sinks.EmitFailureHandler)。
  • Sinks.EmitResult.FAIL_CANCELLED:丢弃值(Operators.onDiscard(Object, Context))。
  • Sinks.EmitResult.FAIL_TERMINATED:丢弃值(Operators.onNextDropped(Object, Context))。
  • Sinks.EmitResult.FAIL_NON_SERIALIZED:抛出一个提及 RS 规范规则 1.3 的 Sinks.EmissionException。请注意,unsafe() 永远不会触发此结果。对于安全的 sink,一个 Sinks.EmitFailureHandler 可能会忙于循环,并乐观地等待争用消失,以避免这种情况…

最后可能会抛出一个未检查的异常(例如,如果下游出现无法传播给任何异步处理程序的致命错误,一个冒泡异常,一个如上所述的 Sinks.EmitResult.FAIL_NON_SERIALIZED,等等)


入门Reactor的时候给的示例都是创建Flux的时候同时就把数据赋值了,比如:Flux.just、Flux.range等,从3.4.0版本后先创建Flux,再发送数据可使用Sinks完成。有两个比较容易混淆的方法:

Sinks.many().multicast() 如果没有订阅者,那么接收的消息直接丢弃

Sinks.many().unicast() 如果没有订阅者,那么保存接收的消息直到第一个订阅者订阅

Sinks.many().replay() 不管有多少订阅者,都保存所有消息

上面方法的对象背压策略支持两种:BackpressureBuffer、BackpressureError,在此场景肯定是选择BackpressureBuffer,需要指定缓存队列,初始化方法如下:Queues.get(queueSize).get()

数据提交有两个方法:

emitNext 指定提交失败策略同步提交

tryEmitNext 异步提交,返回提交成功、失败状态

若使用场景中我们不希望丢数据,可自定义失败策略,提交失败无限重试,当然也可以调用异步方法自己重试。