Reactor 拾遗



  • publishOn的用法和处于订阅链(subscriber-chain)中的其他操作符一样。它将上游信号传给下游,同时执行指定的调度器Scheduler的某个工作线程上的回调。它会改变后续操作符的执行线程(直到下一个publishOn出现在这个链上)。
  • subscribeOn用于订阅(subscription)过程,作用于向上的订阅链(发布者在被订阅时才激活,订阅的传递方向是向上游的)。所以,无论你把subscribeOn至于操作链的什么位置,它都会影响到源头的线程执行环境(context)。但是,它不会影响后续的publishOn,后者仍能够切换其后操作符的线程执行环境。




EmitterProcessor<Integer> processor = EmitterProcessor.create();





Sinks are constructs through which Reactive Streams signals can be programmatically pushed, with Flux or Mono semantics. These standalone sinks expose tryEmit methods that return an Sinks.EmitResult enum, allowing to atomically fail in case the attempted signal is inconsistent with the spec and/or the state of the sink.
This class exposes a collection of (Sinks.Many builders and Sinks.One factories. Unless constructed through the unsafe() spec, these sinks are thread safe in the sense that they will detect concurrent access and fail fast on one of the attempts. unsafe() sinks on the other hand are expected to be externally synchronized (typically by being called from within a Reactive Streams-compliant context, like a Subscriber or an operator, which means it is ok to remove the overhead of detecting concurrent access from the sink itself).

Sink 是一种通过 Flux 或 Mono 语义以编程方式推送 Reactive Streams 信号的构造物。这些独立的 Sink 暴露了 tryEmit 方法,该方法返回一个 Sinks.EmitResult 枚举,允许在尝试的信号与规范和/或 Sink 的状态不一致时原子失败。
该类公开了一组 Sinks.Many 构建器和 Sinks.One 工厂。除非通过 unsafe() 规范构建,否则这些 Sink 在线程安全上是可靠的,它们将检测并发访问,并在尝试中的一个失败时快速失败。另一方面,unsafe() Sink 预期进行外部同步(通常通过从 Reactive Streams 兼容的上下文内部调用,比如 Subscriber 或运算符,这意味着可以从 Sink 本身移除检测并发访问的开销)。

ManySpec many()

A base interface for standalone Sinks with Flux semantics.
The sink can be exposed to consuming code as a Flux via its asFlux() view.

一个独立的 Sink 接口,具有 Flux 语义。
这个 Sink 可以通过其 asFlux() 视图暴露给消费代码,表现为一个 Flux。

void emitNext(T t, EmitFailureHandler failureHandler)

A simplified attempt at emitting a non-null element via the tryEmitNext(Object) API, generating an onNext signal. If the result of the attempt is not a success, implementations SHOULD retry the tryEmitNext(Object) call IF the provided Sinks.EmitFailureHandler returns true. Otherwise, failures are dealt with in a predefined way that might depend on the actual sink implementation (see below for the vanilla reactor-core behavior).
Generally, tryEmitNext(Object) is preferable since it allows a custom handling of error cases, although this implies checking the returned Sinks.EmitResult and correctly acting on it. This API is intended as a good default for convenience.
When the Sinks.EmitResult is not a success, vanilla reactor-core operators have the following behavior:

  • Sinks.EmitResult.FAIL_ZERO_SUBSCRIBER: no particular handling. should ideally discard the value but at that point there’s no Subscriber from which to get a contextual discard handler.
  • Sinks.EmitResult.FAIL_OVERFLOW: discard the value (Operators.onDiscard(Object, Context)) then call emitError(Throwable, Sinks.EmitFailureHandler) with a Exceptions.failWithOverflow(String) exception.
  • Sinks.EmitResult.FAIL_CANCELLED: discard the value (Operators.onDiscard(Object, Context)).
  • Sinks.EmitResult.FAIL_TERMINATED: drop the value (Operators.onNextDropped(Object, Context)).
  • Sinks.EmitResult.FAIL_NON_SERIALIZED: throw an Sinks.EmissionException mentioning RS spec rule 1.3. Note that unsafe() never trigger this result. It would be possible for an Sinks.EmitFailureHandler to busy-loop and optimistically wait for the contention to disappear to avoid this case for safe sinks…

Might throw an unchecked exception as a last resort (eg. in case of a fatal error downstream which cannot be propagated to any asynchronous handler, a bubbling exception, a Sinks.EmitResult.FAIL_NON_SERIALIZED as described above, …).

以下是一个简化的尝试通过 tryEmitNext(Object) API 发出非空元素的示例,生成一个 onNext 信号。如果尝试的结果不成功,实现应根据提供的 Sinks.EmitFailureHandler 返回 true 来重试 tryEmitNext(Object) 调用。否则,失败将以预定义的方式处理,这可能取决于实际的 sink 实现(请参阅下面的 vanilla reactor-core 行为)。
通常情况下,tryEmitNext(Object) 是首选的,因为它允许对错误情况进行自定义处理,尽管这意味着需要检查返回的 Sinks.EmitResult 并正确地对其进行操作。这个 API 旨在作为方便的默认值。
当 Sinks.EmitResult 不成功时,vanilla reactor-core 操作符有以下行为:

  • Sinks.EmitResult.FAIL_ZERO_SUBSCRIBER:没有特殊处理。理想情况下应该丢弃该值,但在这个时候没有订阅者可以从中获取上下文丢弃处理程序。
  • Sinks.EmitResult.FAIL_OVERFLOW:丢弃值(Operators.onDiscard(Object, Context)),然后使用 Exceptions.failWithOverflow(String) 异常调用 emitError(Throwable, Sinks.EmitFailureHandler)。
  • Sinks.EmitResult.FAIL_CANCELLED:丢弃值(Operators.onDiscard(Object, Context))。
  • Sinks.EmitResult.FAIL_TERMINATED:丢弃值(Operators.onNextDropped(Object, Context))。
  • Sinks.EmitResult.FAIL_NON_SERIALIZED:抛出一个提及 RS 规范规则 1.3 的 Sinks.EmissionException。请注意,unsafe() 永远不会触发此结果。对于安全的 sink,一个 Sinks.EmitFailureHandler 可能会忙于循环,并乐观地等待争用消失,以避免这种情况…

最后可能会抛出一个未检查的异常(例如,如果下游出现无法传播给任何异步处理程序的致命错误,一个冒泡异常,一个如上所述的 Sinks.EmitResult.FAIL_NON_SERIALIZED,等等)


Sinks.many().multicast() 如果没有订阅者,那么接收的消息直接丢弃

Sinks.many().unicast() 如果没有订阅者,那么保存接收的消息直到第一个订阅者订阅

Sinks.many().replay() 不管有多少订阅者,都保存所有消息



emitNext 指定提交失败策略同步提交

tryEmitNext 异步提交,返回提交成功、失败状态
