问题标签 [reactive-streams]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
3315 浏览

scala - Akka Stream Source.queue 的背压策略不起作用

我试图理解为什么下面的代码片段正在做它正在做的事情。我原以为因为 Sink 不能比 Source 产生内容更快地产生需求,所以我会收到丢弃消息以响应某些提议(溢出策略设置为 Drop Buffer)以及错误和队列关闭消息在自毁片之后。

片段:

输出:

我认为我的误解是围绕在 QueueSource 类中使用getAsyncCallback。即使 QueueSource 中的报价调用使用正确的报价详细信息调用 stageLogic,阶段逻辑中此代码的实际处理程序在前一个元素完成处理之前不会被调用,因此没有用于检查缓冲区大小或应用溢出的逻辑策略正在得到应用... :-/

0 投票
2 回答
1087 浏览

scala - 在涉及多个 JVM 的 Akka Streams 中保持背压的方法

我知道从 Akka 2.4.16 开始,没有 Reactive Streams 的“远程”实现。该规范侧重于在单个 JVM 上运行的流。

但是,考虑到用例涉及另一个 JVM 进行某些处理,同时保持背压。这个想法是让一个主应用程序提供一个运行流的用户界面。例如,这个流有一个阶段执行一些应该在不同机器上运行的繁重计算。我对以分布式方式运行流的方法感兴趣 - 我遇到了一些文章指出了一些想法:

还有哪些其他选择?以上有什么明显的缺点吗?有什么特殊的特征需要考虑吗?

更新:这个问题不限于单个用例。我通常对在分布式环境中使用流的所有可能方式感兴趣。这意味着,例如,它可以只涉及一个将参与者集成在一起的流,.mapAsync或者例如,在通过 Akka HTTP 通信的两台机器上可能有两个单独的流。唯一的要求是必须在所有组件之间强制执行背压。

0 投票
0 回答
504 浏览

scala - 在到达文件末尾时停止从文件创建的 akka 流源

我通过从文件中读取内容创建了一个 akka-stream 源。

现在,当在可运行图形中使用它时,我希望整个流在到达使用makeFileSource(fp).

流当前不会在到达文件末尾时停止。

0 投票
1 回答
173 浏览

akka-stream - 不能将反应流订阅者与 akka 流源一起使用

我正在尝试将响应流订阅者附加到 akka 源。

我的来源似乎可以与一个简单的接收器(如 foreach)一起正常工作 - 但如果我放入一个由订阅者制作的真正接收器,我什么也得不到。

我的背景是:

我的测试用例是:

我得到输出:

为什么我没有得到 xxx、yyy 和 zzz?

0 投票
2 回答
558 浏览

reactive-programming - 从反应流 SubmissionPublisher 接收项目

我正在尝试 Java 9 中的一些新功能。所以我进行了一个测试,以拥有一个发布者,以给定的速率发布数字。我还实现了一个订阅者来收听这些出版物并将它们打印到控制台。

虽然我可能不完全理解如何使用这个 Api,因为该onNext()方法没有打印任何内容,getLastItem()只返回 0。

唯一似乎有效的部分是onSubscribe()正确初始化lastItem变量的部分。

有人可以告诉我我在测试中做错了什么吗?我希望测试能够打印这些数字并返回 5 作为最后一项。

我不得不说我只在 Angular2 中使用 Observables 和 Subjects,尽管它们似乎更容易理解。

0 投票
1 回答
179 浏览

akka - 为什么 Akka 流只有一个源和汇?

Akka Streams 文档明确指出,为了使流可运行,它必须有一个源和一个接收器。我想知道是什么强加了这种约束。将多个源合并为一个从单个流分叉的多个接收器的场景都是非常合理的。是否有任何技术原因支持这种限制?

0 投票
2 回答
3532 浏览

scala - Akka Stream 连接到多个接收器

我在akka流中实现了一个自定义组件,它将元素作为输入,根据一个键对它们进行分组和合并,然后通过十几个出口之一将它们发送出去。您可以将此组件视为一种 GroupBy 组件,它不会将流划分为子流,而是实际流。除了对传入元素进行分区之外,它还将它们合并为一个元素,即在组件内部发生了一些缓冲,因此 1 个元素输入并不一定意味着 1 个元素通过出口输出。

下面是所述组件的简化实现。

我现在将该组件的每个出口连接到不同的 Sink 并结合所有这些 sink 的物化值。

我已经用图形 DSL 尝试了一些东西,但还没有完全成功。有人愿意为我提供一个片段来做到这一点或指出我正确的方向吗?

提前致谢!

0 投票
1 回答
6812 浏览

project-reactor - 用于阻塞 I/O 任务的 ParallelFlux 与 flatMap()

我有一个 Project Reactor 链,其中包括一个阻塞任务(网络调用,我们需要等待响应)。我想同时运行多个阻塞任务。

似乎可以使用 ParallelFlux 或 flatMap() ,基本示例:

或者

这两种技术的优点是什么?一个比另一个更受欢迎吗?有没有其他选择?

0 投票
4 回答
9526 浏览

spring-webflux - 将 Flux 的结果与 Mono 的结果相结合

我开始使用 Project reactor,而我很少挣扎的一个地方是如何将来自 Mono 的东西与 Flux 结合起来。这是我的用例:

现在我想要实现的是:

如何结合来自 UserFlux 的响应并将这些用户与该组相关联,例如 group.addUsers(userfromFlux)。

有人可以帮助如何组合来自 userFlux 和 groupMono 的结果。我想我使用像 Zip 这样的东西,但它会从源代码进行一对一的映射。就我而言,我需要进行 1 到 N 映射。在这里,我有一个组,但需要将多个用户添加到该组中。返回Mono<List<Users>然后将 zip 运算符与单声道一起使用并提供此处提到的组合器是一个好主意
public static <T1, T2, O> Flux<O> zip(Publisher<? extends T1> source1, Publisher<? extends T2> source2, final BiFunction<? super T1, ? super T2, ? extends O> combinator)吗?

0 投票
1 回答
1805 浏览

scala - akka 流使用网络套接字

开始使用 akka-streams 我想构建一个简单的示例。在使用 web 套接字插件的 chrome 中,我只需通过发送 2 个命令即可连接到像https://blockchain.info/api/api_websocket这样的流wss://ws.blockchain.info/inv

  • {"op":"ping"}
  • {"op":"unconfirmed_sub"} 将在 chrome web socket 插件窗口中流式传输结果。

我尝试在 akka 流中实现相同的功能,但面临一些问题:

  • 执行了 2 个命令,但我实际上没有得到流输出
  • 相同的命令执行两次(ping 命令)

遵循http://doc.akka.io/docs/akka/2.4.7/scala/http/client-side/websocket-support.htmlhttp://doc.akka.io/docs/akka的教程时-http/10.0.0/scala/http/client-side/websocket-support.html#half-closed-client-websockets 下面是我的改编:

当使用来自http://doc.akka.io/docs/akka-http/10.0.0/scala/http/client-side/websocket-support.html#websocketclientflow修改如下概述的流版本时,再次,结果是相同输出的两倍:

见代码:

我怎样才能达到与 chrome 相同的结果

注意,我在版本中使用 akka,在版本中使用2.4.17akka-http10.0.5