问题标签 [reactive-streams]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
572 浏览

scala - 迭代错误/异常处理与反应式流/akka-stream

令人惊讶的是,我在 Iteratees 和错误处理方面遇到了一些问题。

问题;

从一个(来自网络,必须是 InputStream)中读取一些字节,InputStream在这个 InputStream 上做一些分块/grouing(用于工作分配),然后将其转换为case class DataBlock(blockNum: Int, data: ByteString)用于发送给参与者的转换(Array[Bytes] 转换为紧凑字节字符串)。

流量;

InputStream.read -- bytes --> Group -- 1000 byte blocks --> Transform -- DataBlock --> Actors

编码;

问题;

我当前的 Iteratee 代码运行良好。但是,我希望能够处理任何一方的故障;

  1. 当 InputStreamread方法失败时 - 我想知道有多少字节/块已成功处理并从该点继续读取流。当在枚举器中读取时抛出错误,fut只是返回异常,没有状态,所以我不知道我在做什么,除非我将它传递给 rxing 演员(我不想这样做)
  2. 如果输出端失败或无法再接收 DataBlock 消息,因为 Actor 的缓冲区已满,保持从输入流读取

我该怎么做?

因为我需要定义的错误处理,我怎么能/我会更好地使用反应流/Akka-stream(实验性)或scalaz迭代来尝试这个?

0 投票
0 回答
1455 浏览

scala - Akka Stream 中的异常/错误处理

我已经定义了以下管道:

和流:

增强器看起来像这样:

现在,如果满足导致 Augmenter1 中异常的条件,则流程只会在异常的第一个实例处(成功地)终止,而不会引发任何异常。我希望能够做两件事:在链上捕获异常,然后跳到下一个事件。

我的问题:处理流程中的错误/异常的正确方法是什么?

谢谢

0 投票
1 回答
1494 浏览

scala - 测试 Akka 反应式流

我正在测试通过以下方式获得的传出流 TCP 连接流式传输消息的代码:

在我的测试中,我将结果Subscriber[ByteString]替换为虚拟订阅者,触发一些传出消息,并断言已按预期到达。我使用下面的方法来生成虚拟订阅者和未来的流结果。(到目前为止,一切都很好)

我的问题是:是否有一些规范的方法来测试流输出预期值,类似于 Akka 的TestActorRef?如果没有,是否有一些类似于上述函数的库函数?

0 投票
1 回答
760 浏览

akka - Composing Flow Graphs

I've been playing around with Akka Streams and get the idea of creating Flows and wiring them together using FlowGraphs.

I know this part of Akka is still under development so some things may not be finished and some other bits may change, but is it possible to create a FlowGraph that isn't "complete" - i.e. isn't attached to a Sink - and pass it around to different parts of my code to be extended by adding Flow's to it and finally completed by adding a Sink?

Basically, I'd like to be able to compose FlowGraphs but don't understand how... Especially if a FlowGraph has split a stream by using a Broadcast.

Thanks

0 投票
2 回答
292 浏览

javascript - 如何根据 Bacon 中的一些 EventStream 变化来切换流

考虑来自http://baconjs.github.io/的这个例子

如果我还有一个用于重置计数器的按钮和来自该按钮单击的事件流怎么办。如何根据重置点击流切换计数器流以重置计数器?我知道我必须使用 .flatMapLatest 方法,但是参考这个例子我不知道该怎么做。

0 投票
2 回答
2037 浏览

scala - 如何使用 Reactive Streams 进行 NIO 二进制处理?

是否有一些使用org.reactivestreams库使用 Java NIO 处理大型数据流的代码示例(用于高性能)?我的目标是分布式处理,所以使用 Akka 的例子是最好的,但我可以弄清楚。

似乎大多数(我希望不是全部)在 scala 中读取文件的示例都求助于Source(非二进制)或直接 Java NIO(甚至是类似的东西Files.readAllBytes!)

也许我错过了一个激活器模板?(Akka Streams with Scala!可以解决我需要的一切,除了二进制/NIO 端)

0 投票
1 回答
6189 浏览

reactive-programming - 如何合并两个流(不包含空值)并对对应用条件?

考虑我有两个数据流,有没有办法合并它们并对这两个流之间的数据应用条件?例如

如何使用 rxjs 获得上面的组合流?我想对组合流应用条件以引发一些通知。也可以使用最后一个已知的非空数据,例如参见下面的组合流。

我刚刚开始使用反应流的想法,所以如果我误解了反应流的想法,请纠正我。

0 投票
1 回答
899 浏览

scala - 如何发布或订阅物化的 Akka Stream 流程图?

我正在玩 Akka Stream,我试图在实现后弄清楚它的灵活性。

一种方法是使用低级反应流 API: http ://doc.akka.io/api/akka-stream-and-http-experimental/1.0-M3/#akka.stream.scaladsl.PublisherSource

但是,您需要定义这些点以发布或订阅。有没有办法发布或订阅任意物化流图节点?这应该是可能的,因为物化流图只不过是参与者的集合。

例如:首先部署流程图1:A ~> B ~> C

然后,部署流程图 2 和 3:D ~> BB ~> E

0 投票
1 回答
502 浏览

jvm - 如何订阅在不同 JVM 上运行的反应式流实现?

假设我们有两个 Akka Stream 流,每个流都在自己的 JVM 上运行。

此示例在一个 JVM 上运行良好,但我如何订阅在不同 JVM 上运行的发布者?

我必须使用消息/队列中间件还是可以使用反应流 API 将两者连接在一起?

0 投票
5 回答
667 浏览

java - 多个枚举与一个枚举

当我偶然发现一些我不明白为什么这样做的事情时,我正在查看反应流规范的 Publisher ( AsyncIterablePublisher.java ) 的示例实现。

现实一点,我不像写这篇文章的人那样高级程序员,我确信这样做是有理由的。但我也无法解释为什么它会比这样做更好(我会这样做)。

有人可以向我解释为什么它会更好吗?优点缺点?