问题标签 [reactive-streams]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
140 浏览

java - Observable 由冷和热 observables 组成

我很难找到合适的方法来组成一个 observable ,它将从给定的冷 observable 中发出所有项目,A并且一旦它完成就继续使用 hot observable B

这是我的特定用例:我有一个数据收集器,它实时将数据附加到仅附加数据库(事件流)。当一个请求到达流所有事件流时,它预计会开始从数据库中流式传输所有内容,并且一旦数据库没有更多数据,它就会开始流式传输任何收集器流......正如您所看到的那样,两者都可以作为 observables 使用。

我是响应式编程的新手,因此我的问题可能有点抽象。任何帮助表示赞赏。

这是此行为的图表:

R是我们可观察到的结果,A是冷的,B是热的。R以 终止B

0 投票
1 回答
202 浏览

java - 按需执行热 Observable

举一个冷酷的例子:

它开始为每个新订阅者从头开始执行:

如果订阅者提前取消订阅,则可以停止执行:

现在,如果我们有一些实际的业务逻辑而不是示例 for 循环(不需要为每个订阅者重播,而是实时的),那么我们正在处理 hot observable ......

当我们想要它开始时,我们可能会做

因此第一个问题:如何仅在第一个观察者订阅时开始工作?(可能是可连接的可观察的?)

还有一个重要的问题:当最后一个订阅者退订时如何停止工作?(我不知道如何访问该主题的当前订阅,如果存在这样的解决方案,我希望找到没有共享全局状态的干净解决方案)

我能想到的一种解决方案是使用自定义运营商来提升主题,该运营商将管理订户......

0 投票
1 回答
359 浏览

rx-java - 反应式ipc、反应式流io、反应式流网和反应式套接字是什么关系

我在 Github 上发现了很多关于响应式流的 repos,似乎也有类似的。

但是,许多存储库几个月都没有更新。

我想知道他们之间的关系,我可以专注于最新的回购。

反应式ipc:https ://github.com/reactive-ipc/reactive-ipc-jvm

反应流网:https ://github.com/reactive-streams/reactive-streams-net-jvm

反应式套接字:https ://github.com/ReactiveSocket/reactivesocket-java.git

0 投票
2 回答
920 浏览

reactive-programming - 如何在 RxJS v5 中暂停和缓冲 Observables

我正在尝试对 HTTP 请求实施背压策略,以在某些条件下暂时阻止挂起的请求数秒。暂停的逻辑将基于另一个 Observable。

我的研究和理解使我相信pausableBuffered操作员完全符合我的需要。记录在这里http://reactivex.io/documentation/operators/backpressure.html

但是我在 ReactiveX v5 (5.0.0-beta.0) 中找不到这个运算符,迁移指南 (v4 - v5) 似乎表明它们已被删除。如果是这种情况,我如何使用 v5 可用的运算符来达到预期的结果?

0 投票
1 回答
274 浏览

stream - Akka Reactive Streams 处理器的用途

我试图在 akka 中理解 Reactive Streams。我已经阅读了这个博客http://bryangilbert.com/blog/2015/02/04/akka-reactive-streams/,我想我对它的工作原理有了基本的了解。然而,我不明白的是处理器在这个概念中的目的。它是干什么用的?订阅者请求 N-Objects 和发布者使用 onNext() 发送它们还不够吗?

0 投票
1 回答
304 浏览

akka - 是否可以加入 Akka Streams Source 的元素?

如果我有像他这样的来源:

是否可以在特定的分隔符模式上加入和重新拆分元素,例如在 '\n' 字符上以产生类似的结果?:

谢谢!

0 投票
1 回答
1750 浏览

scala - 图表创建错误:需求失败:入口 [] 和出口 [] 必须对应于入口 [in] 和出口 [out]

我正在使用 akka 流 graphDSL 创建一个可运行的图。流组件的入口/出口没有编译时错误。运行时抛出以下错误:

任何想法我应该验证什么以使其运行?

图结构:

0 投票
1 回答
347 浏览

scala - 故意停止正在运行的 akka 流图

假设我有以下简单的图表。

我们正在运行它

我希望通知 kafkaSource 停止(或人为完成)而不是推动下一个可用元素,以便下游连接的阶段也停止。

我该如何做到这一点?

场景是,我们在 kafka 中有数百万条消息,我们希望每天晚上 9 点(例如)停止处理消息,并假设我们正在通过干净关闭来停止正在运行的应用程序。

0 投票
1 回答
281 浏览

akka-stream - 如何在 Akka Streams 2.4.2 中运行 RunnableGraph 并确保入口/出口配置正确?

我已经使用 GraphDSL.create() 配置了一个 RunnableGraph。我还指定了一个 ClosedShape 并连接了所有出口/入口。当我尝试执行程序时,出现以下运行时异常:

requirement failed: The inlets [] and outlets [] must correspond to the inlets [filter.in] and outlets [out]

知道我没有正确连接入口和出口的地方吗?

这是图形代码:

0 投票
1 回答
463 浏览

scala - Akka-http: Http.cachedHostConnectionPoolHttps stops processing after sometime for non 2XX responses

I am trying to use cachedHostConnectionPoolHttps to make outbound https requests, but it seems that after some time for Non 2XX responses, the pool flow stops emitting out element's thus causing the entire flow to stop. The time of occurrence of this particular behaviour is quite random but happens and is reproducible.

Here is a sample test in which it stops emitting results after some time Expected OnNext(_), yet no element signaled during 10 seconds is thrown

Any pointers what is going on wrong here? It works for sometime, so I suspect something is going wrong while handling the non 2xx error.

I have tried turning off akka.http.host-connection-pool.max-retries = 0 and akka.http.host-connection-pool.idle-timeout = infinite, but no results.