问题标签 [project-reactor]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
7861 浏览

spring - 在通量上同时使用 publishOn 和 subscribeOn 不会导致任何事情发生

每当我同时使用 subscribeOn 和 publishOn 时,都不会打印任何内容。如果我只使用一个,它将打印。如果我使用 subscribeOn(Schedulers.immediate()) 或弹性它可以工作。知道为什么吗?

据我了解,publishOn 会影响它发布的线程并订阅订阅者运行的线程。你能指出我正确的方向吗?

0 投票
3 回答
6200 浏览

java - Project Reactor - 如何从 Java 8 流创建滑动窗口 Flux

Java 8 Streams 不允许重用。这产生了一个难题,即在创建滑动窗口通量以计算像 x(i)*x(i-1) 这样的关系时如何重用流。

以下代码基于移位运算符的思想。我用 skip(1) 移动第一个流以创建第二个流。

这是上述代码的可视化表示:

不幸的是,此代码错误为:

显而易见的解决方法是缓存元素并确保缓存大小大于或等于流大小:

或使用流替换:

第二种解决方案将重新执行 skip(1) 序列的原始序列。

然而,一个有效的解决方案只需要一个大小为 2 的缓冲区。如果流恰好是一个大文件,这很重要:

如何有效地缓冲流,以便对主 Flux 的多个订阅不会导致所有内容都被读入内存或导致重新执行?

0 投票
3 回答
47172 浏览

java - 如何转换单声道> 进入通量

我正在将用 RxJava 1.x 编写的小项目转换为 Reactor 3.x。一切都很好,只是我不知道如何flatMap(Observable::from)用合适的对应物替换。我有Mono<List<String>>并且我需要将其转换为Flux<String>.

0 投票
1 回答
130 浏览

java - Reactor 3.x (Java):用于网页抓取

反应堆菜鸟在这里。

这更像是一个 HowTo 问题。

假设我有一个要抓取的网站,其中包含一组分页的搜索结果。搜索结果页数未知。每个搜索页面都有一个指向下一页的链接。我想从所有页面中抓取所有搜索结果并处理每个搜索结果。

我如何使用 Reactor (Mono/Flux) 在 Java 中实现这一点?

我想尽可能“积极”地做到这一点。

基本上,以下命令式伪代码的 Reactor (3.x) 版本:

0 投票
3 回答
45814 浏览

project-reactor - 从通量转换为单声道

如何从具有 1 个元素的 Flux 转换为 Mono?

如何使它成为 Mono(1)?

0 投票
1 回答
1057 浏览

java - 如何正确管理 Reactor 中的可关闭资源

我有一个 http 客户端和执行程序,所有工作完成后应该关闭。

我正在尝试以这里为 RxJava 1.x 描述的方式使用 Flux.using 方法: https ://github.com/meddle0x53/learning-rxjava/blob/master/src/main/java/com/packtpub/反应式/chapter08/ResourceManagement.java

我的资源创建方法:

然后我使用:

问题是客户端连接甚至在第一个请求发出之前就关闭了。

没有找到任何反应堆的例子。

谢谢

0 投票
6 回答
27775 浏览

java - 反应式编程的优点/缺点

我一直在学习和尝试使用 Reactor 和 RxJava 的反应式编码。我确实明白,与单线程执行相比,反应式编码可以更好地利用 CPU。

在基于 Web 的应用程序中,反应式编程与命令式编程之间是否有任何具体的比较?

通过使用反应式编程而不是非反应式编程,我获得的性能增益和吞吐量是多少?

还有反应式编程的优缺点是什么?

有没有统计基准?

0 投票
4 回答
1107 浏览

java - 关闭 Reactor 3.0.4 中的 netty TcpClient 通道

我刚刚将 projectreactor.io 从 reactor OLD升级:[core: 3.0.1 .RELEASE, netty: 0.5.2 .RELEASE] 到 reactor NEW [core: 3.0.4 .RELEASE, netty: 0.6.0 .RELEASE]。

我打开一个 TcpClient 连接并想稍后关闭它。

在我使用的版本中

断开我的客户端与服务器的连接。

新版本中是否有等效调用?我找不到一个!

我在创建我的 TcpClient 时得到的NettyInboundNettyOutboundtcpClient.newHandler(...)

  • .context().dispose()
  • .context().channel().disconnect()
  • .context().channel().close()
  • TcpResources.reset()

他们似乎都没有正确地完成这项工作。

我注意到.context().onClose(...)正在调用相应的 -callback 。但是经过一些额外的等待,服务器端会检查连接。服务器端是普通的 NIO2 而不是 reactor/netty,当客户端升级时,服务器端保持不变。

使用OLD客户端,我.isOpen() == false为服务器端的每个频道提供服务。

使用客户端,我可以获得.isOpen() == true服务器端的每个频道。大多数时候我什至可以写信给频道。.isOpen() == false一些通道在写入几个字节后切换到。

0 投票
3 回答
1090 浏览

multithreading - 反应式并行化不起作用

使用项目 Reactor 3.0.4.RELEASE。从概念上讲,在 RxJava 中也应该是相同的。

这个想法是client.refreshPods为每个组在单独的线程中运行。

编辑:我publishOn在发布这个问题之前和这里给出的答案之后尝试过,但输出没有改变。

客户:

问题是每个组的日志语句Attempting to refresh pod都打印在同一个线程上。我在这里想念什么?

测试运行的日志

0 投票
0 回答
69 浏览

java - 使用reactor管理依赖任务

我有一个用例,其中出现了一些消息,每个消息都会触发一组任务,其中一些可能相互依赖,即。TaskD 需要 TaskA 和 TaskB 的结果,而 TaskC 独立于它们的任务,可以并行运行。

每条消息的任务和依赖关系可能会有所不同,可以根据消息类型进行配置。要求必须完成所有任务才能存储最终处理结果。

一些用于依赖管理的伪配置。

[msgType=aa]

  • 任务A
  • 任务B
  • 任务C
  • 任务D,依赖=任务A,任务B

我正在使用 Java 8 和 spring-boot。你能建议我用最好的方法干净地解决这个问题reactor吗?reactor适合这里吗?