publishOn与subscribeOn

Reactor提供了两种在响应式链中调整调度器Scheduler的方法:publishOn和subscribeOn。它们都接受一个Scheduler作为参数,从而可以改变调度器。

  • publishOn的用法和处于订阅链(subscriber-chain)中的其他操作符一样。它将上游信号传给下游,同时执行指定的调度器Scheduler的某个工作线程上的回调。它会改变后续操作符的执行线程(直到下一个publishOn出现在这个链上)。
  • subscribeOn用于订阅(subscription)过程,作用于向上的订阅链(发布者在被订阅时才激活,订阅的传递方向是向上游的)。所以,无论你把subscribeOn至于操作链的什么位置,它都会影响到源头的线程执行环境(context)。但是,它不会影响后续的publishOn,后者仍能够切换其后操作符的线程执行环境。

Flux和Mono不会创建线程。但类似publishOn的一些操作符会创建线程。同时,作为一种任务共享形式,这些操作符可能会从其他任务池(workpool)——如果其他任务池是空闲的话——那里“偷”线程。因此,无论是Flux、Mono还是Subscriber都应该精于线程处理。它们依赖这些操作符来管理线程和任务池。publishOn强制下一个操作符(很可能包括下一个的下一个…​)来运行在一个不同的线程上。类似的,subscribeOn强制上一个操作符(很可能包括上一个的上一个…​)来运行在一个不同的线程上。记住,在你订阅(subscribe)前,你只是定义了处理流程,而没有启动发布者。基于此,Reactor可以使用这些规则来决定如何执行操作链。然后,一旦你订阅了,整个流程就开始工作了。

Scheduler.parallel()创建一个基于单线程ExecutorService的固定大小的任务线程池。因为可能会有一个或两个线程导致问题,它总是至少创建4个线程。然后publishOn方法便共享了这些任务线程,当publishOn请求元素的时候,会从任一个正在发出元素的线程那里获取元素。这样,就是进行了任务共享(一种资源共享方式)。Reactor还提供了好几种共享资源的方式,请参考Schedulers。
Scheduler.elastic()也能创建线程,它能够很方便地创建专门的线程(以便运行可能会阻塞资源的任务,比如一个同步服务)。

publishOn()可以用来切换执行线程。publishOn能够影响到其之后的操作符的执行线程,直到有新的publishOn出现。因此publishOn的位置很重要。如下例,

1
2
3
4
5
6
EmitterProcessor<Integer> processor = EmitterProcessor.create();
processor.publishOn(scheduler1)
.map(i->transform(i))
.publishOn(scheduler2)
.doOnNext(i->processNext(i))
.subscribe();

map()中的transform方法是在scheduler1的一个工作线程上执行的,而doOnNext()中的processNext方法是在scheduler2的一个工作线程上执行的。单线程的调度器可能用于不同阶段的任务或不同的订阅者,用于确保线程关联性。

Sinks

主要使用ChatGPT3.5翻译

综述

Sinks are constructs through which Reactive Streams signals can be programmatically pushed, with Flux or Mono semantics. These standalone sinks expose tryEmit methods that return an Sinks.EmitResult enum, allowing to atomically fail in case the attempted signal is inconsistent with the spec and/or the state of the sink.
This class exposes a collection of (Sinks.Many builders and Sinks.One factories. Unless constructed through the unsafe() spec, these sinks are thread safe in the sense that they will detect concurrent access and fail fast on one of the attempts. unsafe() sinks on the other hand are expected to be externally synchronized (typically by being called from within a Reactive Streams-compliant context, like a Subscriber or an operator, which means it is ok to remove the overhead of detecting concurrent access from the sink itself).

Sink 是一种通过 Flux 或 Mono 语义以编程方式推送 Reactive Streams 信号的构造物。这些独立的 Sink 暴露了 tryEmit 方法,该方法返回一个 Sinks.EmitResult 枚举,允许在尝试的信号与规范和/或 Sink 的状态不一致时原子失败。
该类公开了一组 Sinks.Many 构建器和 Sinks.One 工厂。除非通过 unsafe() 规范构建,否则这些 Sink 在线程安全上是可靠的,它们将检测并发访问,并在尝试中的一个失败时快速失败。另一方面,unsafe() Sink 预期进行外部同步(通常通过从 Reactive Streams 兼容的上下文内部调用,比如 Subscriber 或运算符,这意味着可以从 Sink 本身移除检测并发访问的开销)。

ManySpec many()

A base interface for standalone Sinks with Flux semantics.
The sink can be exposed to consuming code as a Flux via its asFlux() view.

一个独立的 Sink 接口,具有 Flux 语义。
这个 Sink 可以通过其 asFlux() 视图暴露给消费代码,表现为一个 Flux。

void emitNext(T t, EmitFailureHandler failureHandler)

A simplified attempt at emitting a non-null element via the tryEmitNext(Object) API, generating an onNext signal. If the result of the attempt is not a success, implementations SHOULD retry the tryEmitNext(Object) call IF the provided Sinks.EmitFailureHandler returns true. Otherwise, failures are dealt with in a predefined way that might depend on the actual sink implementation (see below for the vanilla reactor-core behavior).
Generally, tryEmitNext(Object) is preferable since it allows a custom handling of error cases, although this implies checking the returned Sinks.EmitResult and correctly acting on it. This API is intended as a good default for convenience.
When the Sinks.EmitResult is not a success, vanilla reactor-core operators have the following behavior:

  • Sinks.EmitResult.FAIL_ZERO_SUBSCRIBER: no particular handling. should ideally discard the value but at that point there’s no Subscriber from which to get a contextual discard handler.
  • Sinks.EmitResult.FAIL_OVERFLOW: discard the value (Operators.onDiscard(Object, Context)) then call emitError(Throwable, Sinks.EmitFailureHandler) with a Exceptions.failWithOverflow(String) exception.
  • Sinks.EmitResult.FAIL_CANCELLED: discard the value (Operators.onDiscard(Object, Context)).
  • Sinks.EmitResult.FAIL_TERMINATED: drop the value (Operators.onNextDropped(Object, Context)).
  • Sinks.EmitResult.FAIL_NON_SERIALIZED: throw an Sinks.EmissionException mentioning RS spec rule 1.3. Note that unsafe() never trigger this result. It would be possible for an Sinks.EmitFailureHandler to busy-loop and optimistically wait for the contention to disappear to avoid this case for safe sinks…

Might throw an unchecked exception as a last resort (eg. in case of a fatal error downstream which cannot be propagated to any asynchronous handler, a bubbling exception, a Sinks.EmitResult.FAIL_NON_SERIALIZED as described above, …).

以下是一个简化的尝试通过 tryEmitNext(Object) API 发出非空元素的示例,生成一个 onNext 信号。如果尝试的结果不成功,实现应根据提供的 Sinks.EmitFailureHandler 返回 true 来重试 tryEmitNext(Object) 调用。否则,失败将以预定义的方式处理,这可能取决于实际的 sink 实现(请参阅下面的 vanilla reactor-core 行为)。
通常情况下,tryEmitNext(Object) 是首选的,因为它允许对错误情况进行自定义处理,尽管这意味着需要检查返回的 Sinks.EmitResult 并正确地对其进行操作。这个 API 旨在作为方便的默认值。
当 Sinks.EmitResult 不成功时,vanilla reactor-core 操作符有以下行为:

  • Sinks.EmitResult.FAIL_ZERO_SUBSCRIBER:没有特殊处理。理想情况下应该丢弃该值,但在这个时候没有订阅者可以从中获取上下文丢弃处理程序。
  • Sinks.EmitResult.FAIL_OVERFLOW:丢弃值(Operators.onDiscard(Object, Context)),然后使用 Exceptions.failWithOverflow(String) 异常调用 emitError(Throwable, Sinks.EmitFailureHandler)。
  • Sinks.EmitResult.FAIL_CANCELLED:丢弃值(Operators.onDiscard(Object, Context))。
  • Sinks.EmitResult.FAIL_TERMINATED:丢弃值(Operators.onNextDropped(Object, Context))。
  • Sinks.EmitResult.FAIL_NON_SERIALIZED:抛出一个提及 RS 规范规则 1.3 的 Sinks.EmissionException。请注意,unsafe() 永远不会触发此结果。对于安全的 sink,一个 Sinks.EmitFailureHandler 可能会忙于循环,并乐观地等待争用消失,以避免这种情况…

最后可能会抛出一个未检查的异常(例如,如果下游出现无法传播给任何异步处理程序的致命错误,一个冒泡异常,一个如上所述的 Sinks.EmitResult.FAIL_NON_SERIALIZED,等等)


入门Reactor的时候给的示例都是创建Flux的时候同时就把数据赋值了,比如:Flux.just、Flux.range等,从3.4.0版本后先创建Flux,再发送数据可使用Sinks完成。有两个比较容易混淆的方法:

Sinks.many().multicast() 如果没有订阅者,那么接收的消息直接丢弃

Sinks.many().unicast() 如果没有订阅者,那么保存接收的消息直到第一个订阅者订阅

Sinks.many().replay() 不管有多少订阅者,都保存所有消息

上面方法的对象背压策略支持两种:BackpressureBuffer、BackpressureError,在此场景肯定是选择BackpressureBuffer,需要指定缓存队列,初始化方法如下:Queues.get(queueSize).get()

数据提交有两个方法:

emitNext 指定提交失败策略同步提交

tryEmitNext 异步提交,返回提交成功、失败状态

若使用场景中我们不希望丢数据,可自定义失败策略,提交失败无限重试,当然也可以调用异步方法自己重试。

糖醋排骨/猪脚

1
2
3
4
5
料酒   1勺
酱油 2勺
唐 3勺
醋 4勺
水 5勺
  • 先焯水去掉浮沫
  • 排骨需要煮半小时左右,脱骨即可调色调味后大火收汁
  • 猪脚使用砂锅炖2小时左右,可以调色起锅

TIPs

  • 用鸡蛋清腌制肉类会十分嫩滑
  • 老抽调色是真的好用

Hexo

基本用法

1
2
3
4
5
6
7
8
hexo new "name"       # 新建文章
hexo new page "name" # 新建页面
hexo gernerate|g # 生成页面
hexo deploy|d # 部署到远程机器
hexo g -d # 生成页面并部署
hexo server|s # 使用hexo-server部署,前提是安装了npm install hexo-server --save
hexo clean # 清除缓存和已生成的静态文件
hexo help # 帮助

PM2

1
2
3
4
pm2 start yourscripts.js
pm2 list
pm2 stop app_name|app_id
pm2 stop all

Next主题

安装NexT
修改备案号
配置

相关链接

linux使用pm2后台运行hexo教程

PM2实用入门指南

使用 Gitea + Git Hook 实现 Hexo 博客源码托管与持续集成

使用 Gitea Actions 实现 Hexo 博客持续集成与持续部署

Maven 教程(10)— Maven依赖详解
https://blog.csdn.net/liupeifeng3514/article/details/79545022

spring boot在maven多模块项目中怎么加载到其他模块的Controller?
https://www.oschina.net/question/565351_2268153?sort=time
springboot多模块打包后,无法扫描子包jar中的注解,哪位大神遇到过?
https://www.oschina.net/question/2557533_2244047
https://bbs.csdn.net/topics/393697365

maven-assembly-plugin 入门指南
https://www.jianshu.com/p/14bcb17b99e0

Springboot在idea中直接运行没有问题,打war包,部署到外置的Tomcat容器后,却无法访问项目
https://www.jianshu.com/p/1ba1187250cb

maven assembly 配置详解
https://www.cnblogs.com/zhangtan/p/7645241.html

Spring Boot 项目本地运行无异常,部署到 Linux 服务器运行报错:java.lang.ClassNotFoundException
https://www.cnblogs.com/southday/p/12115013.html

SpringBoot打包Jar引入第三方jar包,部署出现ClassNotFound的现象
https://blog.csdn.net/by_yangge/article/details/100657670

maven打包工具assembly打包本地jar的解决办法
https://www.cnblogs.com/maocaowu/p/12175183.html

用maven assembly插件打jar包实现依赖包归档
https://blog.csdn.net/e5945/article/details/7777286
https://blog.csdn.net/xu187/article/details/99851390

– 跨行匹配
STORED BY([\s\S]*?)TBLPROPERTIES

()\r\nCOMMENT([\s\S]*?)MAP KEYS TERMINATED BY ‘:’)

(\w{1,}).*

(^(\w{1,}).*)(\r\n)$ -> $1

^(\w{1,}).*([,\r\n| \r\n])$ -> $1$2

匹配 1900-2999 年月日 YYYYMMDD
^((((19|20)\d{2})(0(1|[3-9])|1[012])(0[1-9]|[12]\d|30))|(((19|20)\d{2})(0[13578]|1[02])31)|(((19|20)\d{2})02(0[1-9]|1\d|2[0-8]))|((((19|20)([13579][26]|[2468][048]|0[48]))|(2000))0229))$

YYYYMM 1900-2999 年月
^((19|20)\d{2})(0([1-9])|1[012])$

YYMMDD 1900-2999 年月日
^(((\d{2})(0(1|[3-9])|1[012])(0[1-9]|[12]\d|30))|((\d{2})(0[13578]|1[02])31)|((\d{2})02(0[1-9]|1\d|2[0-8]))|(((([13579][26]|[2468][048]|0[48]))|(2000))0229))$

匹配 24小时制时分秒 hhmmss
^(([0-1][0-9]|2[0-3])([0-5][0-9])([0-5][0-9]))$

YYYYMMDDhhmmss
^((((19|20)\d{2})(0(1|[3-9])|1[012])(0[1-9]|[12]\d|30))|(((19|20)\d{2})(0[13578]|1[02])31)|(((19|20)\d{2})02(0[1-9]|1\d|2[0-8]))|((((19|20)([13579][26]|[2468][048]|0[48]))|(2000))0229))(([0-1][0-9]|2[0-3])([0-5][0-9])([0-5][0-9]))$

身份证位数
^\d{15}$|^\d{18}$|^\d{17}[Xx]$
以下其实缺少校验和
( ^[1-9]\d{5}(18|19|([23]\d))\d{2}((0[1-9])|(10|11|12))(([0-2][1-9])|10|20|30|31)\d{3}[0-9Xx]$)|(^[1-9]\d{5}\d{2}((0[1-9])|(10|11|12))(([0-2][1-9])|10|20|30|31)\d{2}$)

自然数
^(0|[1-9][0-9]*)$
n位自然数 n>1 ^(0|[1-9][0-9]{0,(n-1)})$

小数
^(0|[1-9][0-9]{0,2})(.[0-9]{1,3})?$

非纯数字 且 不包含中英文的;, . 、
^((([0-9]))([^,;,;、.\d]+)([0-9]))+$

^[1-9][0-9]{5}$

^[A-Z0-9]{6}$

sqlserver 匹配类型
^\b+(\w+)\b+(image|money|int|decimal|text|smallint|varchar|datetime|numeric|tinyint|smalldatetime|float|char|bigint|nvarchar|sysname).*–(\w+)\b+

Welcome to Hexo! This is your very first post. Check documentation for more info. If you get any problems when using Hexo, you can find the answer in troubleshooting or you can ask me on GitHub.

Quick Start

Create a new post

1
$ hexo new "My New Post"

More info: Writing

Run server

1
$ hexo server

More info: Server

Generate static files

1
$ hexo generate

More info: Generating

Deploy to remote sites

1
$ hexo deploy

More info: Deployment

0%