问题标签 [reactive-streams]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
scala - 端到端反应式流式处理 RESTful 服务(又名 Back-Pressure over HTTP)
我一直试图在网上澄清这个问题一段时间没有成功,所以我会尝试在这里问它。
我想找到一些资源或示例来展示我如何构建端到端完全背压的 REST 服务 + 客户端。我的意思是,我希望看到,给定一个实现 Reactive Streams 的 REST 客户端(无论是在 Akka、JS 还是其他),我将拥有(并且能够“可视化”)在整个过程中处理的背压REST 服务器构建,例如使用 Akka-Http。
需要明确的是,我正在寻找类似以下谈话的内容(但我找不到幻灯片或视频来确认它): http: //oredev.org/2014/sessions/reactive-streaming-restful-applications-with-akka -http
我对我看到的大多数示例的怀疑是关于我可以找到很多 REST 服务(服务器)在后端使用 Akka Http 和 Akka 流的情况,但我不确定背压是否通过 HTTP “通信”和 REST,如果客户端正在实现 Reactive Streams。在这种情况下,我会通过 TCP/HTTP 桥接一个“流”还是只有 2 个独立的流?这是我主要的怀疑和困惑。
希望我足够清楚,有人能够对此事有所了解。
无论如何,谢谢!
scala - Akka-Streams:Kafka 和 Cassandra 的至少一次交付行为
我从 Akka Streams 开始,到目前为止一切顺利。但是,我遇到了一个我不知道如何处理的用例。该场景是一个流,其中 ActorPublisher 作为源,它使用来自 Kafka 的消息,而订阅者作为接收器,用于更新 Cassandra 表。
Kafka ~> 一些映射操作 ~> Cassandra
关键是,每次成功处理消息并插入 Cassandra 时,我都想明确地向 Kafka 确认,以便我可以在发生灾难和服务失败时重新读取消息,即至少某种一次交付行为。就 Akka Streams 而言,我该如何处理?是受支持的方案吗?
确实,我总是可以为 Kafka 消费者配置自动提交行为,但我宁愿控制我阅读消息的方式。
更新
关于这个话题,我们目前正在评估Reactive Kafka,他们在 0.8 版本的 kafka 中包含手动提交(这些人的荣誉)。这个特性将允许我们实现我们需要的 alod 行为。
servlets - 在 Servlet 中使用 java 9 Flows 的反应流用例?
我正在寻找在 servlet 容器(或只是 HTTP 服务器)中使用反应流的用例。
Jetty 项目已经开始被问到:“Jetty 是反应式的吗?” 我们已经注意到向 java 9 添加反应流的提议。
因此,我们已经开始了一些使用反应流 API 进行异步 servlet IO 的实验,这很有趣......但是由于我们缺乏真正的用例来关注哪些问题是最重要的,因此缺乏任何关注点。
是否有人可以分享/解释任何好的用例,以便我们可以指导我们的码头实验以满足他们的需求。我想象的那种事情是让基于 RS 的数据库发布者在 HTTP 响应或 websocket 连接上一直发送对象,使用 Flow.Processors 进行转换。
scala - 我可以实施自己的溢出策略吗?
是否有可能(或者将来可能)将我自己的OverflowStrategy实现为元素当前缓冲区的函数?还是有特定的理由不允许这样做?
谢谢!
scala - 如何从广播的 Akka 流中获取订阅者和发布者?
使用更复杂的图表时,我在让发布者和订阅者退出我的流程时遇到问题。我的目标是提供发布者和订阅者的 API,并在内部运行 Akka 流。这是我的第一次尝试,效果很好。
但是对于更复杂的广播图,我不确定如何获取订阅者和发布者对象?我需要部分图吗?
java - Java / Scala - ReactiveStreams / Enumeratee 如何持续读取输入流,直到 read() 返回 -1
目前我有一个 Publisher[InputStream] 我怎么知道读它有类似 Java8 Streams API 提供的东西:
目前我使用 Scala 和 playframework 并希望通过实验性的 akka-streams / reactivestreams 库对输出进行分块,但是我不知道,因为关于这两件事的文档很少甚至没有。有什么建议么?
scala - 与 Akka Streams 同步反馈
我想要实现的是用 akka 流实现类似同步反馈循环的东西。
假设你有一个Flow[Int].filter(_ % 5 == 0)
. 当您将Int
's 流广播到此流并直接在其后面压缩元组时,您会得到类似
有没有办法发出一个Option[Int]
,它指示在我推动下一个元素通过它之后流是否发出一个元素?
我想过实现我自己DetachedStage
的右前后后Flow
保持一个状态,每当流量拉到之前的舞台上时,我就知道他需要下一个元素。当后面的舞台没有收到元素时,它是None。
不幸的是,结果并不好,并且被许多职位所淘汰。
旁注
过滤器流只是一个例子,它可以是一个非常长的流,我无法提供Option
在它的每个阶段发出的能力,所以我真的必须知道,流是否推动了下一个或没有而是从下游请求下一个
我也玩过conflate
and expand
,但这些结果的位置偏移更糟
我在配置中更改的一件事是流的initial
缓冲区max
,因此我可以确定指示的需求确实是在我推动它的元素之后。
很高兴获得有关如何解决此问题的一些建议!
scala - 如何处理带有内部排队反应流订阅者的 OnComplete 消息?
我正在使用带有简单反应流的 Akka-Stream 1.0:
- 发布者发送 N 条消息
- 订阅者消费了 N 条消息
和
发布者将通过发送OnComplete
消息在 N 条消息(动态)后关闭流。
订阅者收到消息并立即进入canceled
状态。问题是,订阅者需要一些时间来处理每条消息,这意味着我通常会积压一些消息 - 随着订阅者的获取,这些消息无法再处理canceled
- 恕我直言ActorSubscriber.scala:195
处理消息意味着我的订阅者会将工作卸载给其他人(通过 Spray 发送内容ChunkedMessage
)并在消息完成后立即收到确认消息。由于 Actor 被取消,ack 消息永远不会被处理,并且 backlog 会被处理。
有什么建议让我完成积压工作?我可以“发明”我自己的“完成标记”,但这对我来说听起来很奇怪。显然,我的代码MaxInFlightRequestStrategy
最多可以使用 1 - 因为那里的需求总是只有 1 - 这意味着我从来没有积压的消息。
scala - 基于发布者的源不输出元素
我基于这样的 ReactiveStreams Publisher 为 Akka Stream 创建了一个 Source:
它将向 Flickr 发出搜索请求,并将结果作为JsValue
s 获取。我试图将它连接到许多不同的流和接收器,但这将是最基本的设置:
我看到onNext
被调用了几次,然后是onComplete
. 但是,Sink 没有收到任何东西。我错过了什么,这不是创建源的有效方法吗?
scala - 在 akka-stream 中,如何从期货集合中创建无序 Source
我需要akka.stream.scaladsl.Source[T, Unit]
从Future[T]
.
例如,有一个返回整数的期货集合,
如何创建一个
从中。
我不能使用Future.sequence
组合器,从那时起我会等待每个未来完成,然后再从源头获取任何东西。一旦任何未来完成,我想以任何顺序获得结果。
我知道这Source
是一个纯粹的功能性 API,它不应该在以某种方式实现它之前运行任何东西。所以,我的想法是使用一个Iterator
(这是懒惰的)来创建一个源:
但这将是期货的来源,而不是实际价值的来源。我也可以阻止next
使用Await.result(future)
,但我不确定哪个线程池的线程会被阻止。这也将按顺序调用期货,而我需要并行执行。
更新 2:事实证明有一种更简单的方法(感谢 Viktor Klang):
更新:这是我根据@sschaef 的回答得到的: