问题标签 [reactive-streams]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
2212 浏览

scala - 端到端反应式流式处理 RESTful 服务(又名 Back-Pressure over HTTP)

我一直试图在网上澄清这个问题一段时间没有成功,所以我会尝试在这里问它。

我想找到一些资源或示例来展示我如何构建端到端完全背压的 REST 服务 + 客户端。我的意思是,我希望看到,给定一个实现 Reactive Streams 的 REST 客户端(无论是在 Akka、JS 还是其他),我将拥有(并且能够“可视化”)在整个过程中处理的背压REST 服务器构建,例如使用 Akka-Http。

需要明确的是,我正在寻找类似以下谈话的内容(但我找不到幻灯片或视频来确认它): http: //oredev.org/2014/sessions/reactive-streaming-restful-applications-with-akka -http

我对我看到的大多数示例的怀疑是关于我可以找到很多 REST 服务(服务器)在后端使用 Akka Http 和 Akka 流的情况,但我不确定背压是否通过 HTTP “通信”和 REST,如果客户端正在实现 Reactive Streams。在这种情况下,我会通过 TCP/HTTP 桥接一个“流”还是只有 2 个独立的流?这是我主要的怀疑和困惑。

希望我足够清楚,有人能够对此事有所了解。
无论如何,谢谢!

0 投票
1 回答
1657 浏览

scala - Akka-Streams:Kafka 和 Cassandra 的至少一次交付行为

我从 Akka Streams 开始,到目前为止一切顺利。但是,我遇到了一个我不知道如何处理的用例。该场景是一个流,其中 ActorPublisher 作为源,它使用来自 Kafka 的消息,而订阅者作为接收器,用于更新 Cassandra 表。

Kafka ~> 一些映射操作 ~> Cassandra

关键是,每次成功处理消息并插入 Cassandra 时,我都想明确地向 Kafka 确认,以便我可以在发生灾难和服务失败时重新读取消息,即至少某种一次交付行为。就 Akka Streams 而言,我该如何处理?是受支持的方案吗?

确实,我总是可以为 Kafka 消费者配置自动提交行为,但我宁愿控制我阅读消息的方式。

更新

关于这个话题,我们目前正在评估Reactive Kafka,他们在 0.8 版本的 kafka 中包含手动提交(这些人的荣誉)。这个特性将允许我们实现我们需要的 alod 行为。

0 投票
1 回答
1022 浏览

servlets - 在 Servlet 中使用 java 9 Flows 的反应流用例?

我正在寻找在 servlet 容器(或只是 HTTP 服务器)中使用反应流的用例。

Jetty 项目已经开始被问到:“Jetty 是反应式的吗?” 我们已经注意到向 java 9 添加反应流的提议。

因此,我们已经开始了一些使用反应流 API 进行异步 servlet IO 的实验,这很有趣......但是由于我们缺乏真正的用例来关注哪些问题是最重要的,因此缺乏任何关注点。

是否有人可以分享/解释任何好的用例,以便我们可以指导我们的码头实验以满足他们的需求。我想象的那种事情是让基于 RS 的数据库发布者在 HTTP 响应或 websocket 连接上一直发送对象,使用 Flow.Processors 进行转换。

0 投票
1 回答
71 浏览

scala - 我可以实施自己的溢出策略吗?

是否有可能(或者将来可能)将我自己的OverflowStrategy实现为元素当前缓冲区的函数?还是有特定的理由不允许这样做?

谢谢!

0 投票
1 回答
1427 浏览

scala - 如何从广播的 Akka 流中获取订阅者和发布者?

使用更复杂的图表时,我在让发布者和订阅者退出我的流程时遇到问题。我的目标是提供发布者和订阅者的 API,并在内部运行 Akka 流。这是我的第一次尝试,效果很好。

但是对于更复杂的广播图,我不确定如何获取订阅者和发布者对象?我需要部分图吗?

0 投票
1 回答
256 浏览

java - Java / Scala - ReactiveStreams / Enumeratee 如何持续读取输入流,直到 read() 返回 -1

目前我有一个 Publisher[InputStream] 我怎么知道读它有类似 Java8 Streams API 提供的东西:

目前我使用 Scala 和 playframework 并希望通过实验性的 akka-streams / reactivestreams 库对输出进行分块,但是我不知道,因为关于这两件事的文档很少甚至没有。有什么建议么?

0 投票
1 回答
696 浏览

scala - 与 Akka Streams 同步反馈

我想要实现的是用 akka 流实现类似同步反馈循环的东西。

假设你有一个Flow[Int].filter(_ % 5 == 0). 当您将Int's 流广播到此流并直接在其后面压缩元组时,您会得到类似

有没有办法发出一个Option[Int],它指示在我推动下一个元素通过它之后流是否发出一个元素?

我想过实现我自己DetachedStage的右前后后Flow保持一个状态,每当流量拉到之前的舞台上时,我就知道他需要下一个元素。当后面的舞台没有收到元素时,它是None。

不幸的是,结果并不好,并且被许多职位所淘汰。

旁注

过滤器流只是一个例子,它可以是一个非常长的流,我无法提供Option在它的每个阶段发出的能力,所以我真的必须知道,流是否推动了下一个或没有而是从下游请求下一个

我也玩过conflateand expand,但这些结果的位置偏移更糟

我在配置中更改的一件事是流的initial缓冲区max,因此我可以确定指示的需求确实是在我推动它的元素之后。

很高兴获得有关如何解决此问题的一些建议!

0 投票
1 回答
323 浏览

scala - 如何处理带有内部排队反应流订阅者的 OnComplete 消息?

我正在使用带有简单反应流的 Akka-Stream 1.0:

  • 发布者发送 N 条消息
  • 订阅者消费了 N 条消息

发布者将通过发送OnComplete消息在 N 条消息(动态)后关闭流。

订阅者收到消息并立即进入canceled状态。问题是,订阅者需要一些时间来处理每条消息,这意味着我通常会积压一些消息 - 随着订阅者的获取,这些消息无法再处理canceled- 恕我直言ActorSubscriber.scala:195

处理消息意味着我的订阅者会将工作卸载给其他人(通过 Spray 发送内容ChunkedMessage)并在消息完成后立即收到确认消息。由于 Actor 被取消,ack 消息永远不会被处理,并且 backlog 会被处理。

有什么建议让我完成积压工作?我可以“发明”我自己的“完成标记”,但这对我来说听起来很奇怪。显然,我的代码MaxInFlightRequestStrategy最多可以使用 1 - 因为那里的需求总是只有 1 - 这意味着我从来没有积压的消息。

0 投票
1 回答
90 浏览

scala - 基于发布者的源不输出元素

我基于这样的 ReactiveStreams Publisher 为 Akka Stream 创建了一个 Source:

它将向 Flickr 发出搜索请求,并将结果作为JsValues 获取。我试图将它连接到许多不同的流和接收器,但这将是最基本的设置:

我看到onNext被调用了几次,然后是onComplete. 但是,Sink 没有收到任何东西。我错过了什么,这不是创建源的有效方法吗?

0 投票
2 回答
3404 浏览

scala - 在 akka-stream 中,如何从期货集合中创建无序 Source

我需要akka.stream.scaladsl.Source[T, Unit]Future[T].

例如,有一个返回整数的期货集合,

如何创建一个

从中。

我不能使用Future.sequence组合器,从那时起我会等待每个未来完成,然后再从源头获取任何东西。一旦任何未来完成,我想以任何顺序获得结果。

我知道这Source是一个纯粹的功能性 API,它不应该在以某种方式实现它之前运行任何东西。所以,我的想法是使用一个Iterator(这是懒惰的)来创建一个源:

但这将是期货的来源,而不是实际价值的来源。我也可以阻止next使用Await.result(future),但我不确定哪个线程池的线程会被阻止。这也将按顺序调用期货,而我需要并行执行。

更新 2:事实证明有一种更简单的方法(感谢 Viktor Klang):

更新:这是我根据@sschaef 的回答得到的: