问题标签 [akka-stream]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
scala - 如何将 Source[ByteString, Any] 转换为 InputStream
akka-http 表示使用 multipart/form-data 编码上传的文件为Source[ByteString, Any]
. 我需要使用需要InputStream
.
怎么Source[ByteString, Any]
能变成一个InputStream
?
scala - Akka-http 使用 Stream 处理请求
我尝试编写一些简单的基于 akka-http 和 akka-streams 的应用程序来处理 http 请求,总是使用一个预编译流,因为我计划在我的 requestProcessor 流中使用带有背压的长时间处理
我的申请代码:
我找到了有关如何创建可以动态接受要处理的新项目的 Source 的解决方案,但是我可以找到有关如何在我的路由中获取流执行结果的任何解决方案
scala - 访问由 Source.actorRef 创建的 akka 流 Source 的底层 ActorRef
我正在尝试使用Source.actorRef方法来创建akka.stream.scaladsl.Source对象。某种形式的东西
我的问题是:如何将数据发送到基于 ActorRef 的 Source 对象?
我认为向 Source 发送消息是某种形式
但weatherSource
没有!
运算符或tell
方法。
该文档对如何使用 Source.actorRef 没有太多描述性,它只是说您可以...
提前感谢您的评论和回复。
scala - 超过配置的最大打开请求数
最近我开始使用 akka 流构建一些小型 Web 处理服务。这很简单,我从 redis 中提取 url,然后我下载这些 url(它们是图像),稍后我正在处理图像,并将它们推送到 s3 和一些 json 到 redis。
我正在从多个站点下载许多不同类型的图像,我收到了一大堆错误,例如 404、意外断开连接、响应内容长度 17951202 超过了配置的 8388608 限制、EntityStreamException:实体流截断和重定向。通过重定向,我正在调用 requestWithRedirects ,其地址位于响应的位置标头中。
负责下载的部分差不多是这样的:
TimeoutFuture 非常简单,它需要未来和超时。如果未来的时间超过超时,它会返回其他未来的超时异常。我遇到的问题是:一段时间后我收到一个错误:
我不确定可能是什么问题,但我认为我有一些下载没有正确完成,一段时间后它们会留在一些全局连接池中,导致提到的错误。任何想法可能导致问题?或者如何尝试找到问题的根源:我已经测试了 404 响应,并且响应内容长度超出了...错误,它们似乎不是我的麻烦制造者。
编辑:很可能问题出在我的 TimeoutFuture 上。我用这里描述的错误填充它https://stackoverflow.com/a/29330010/2963977但在我看来,实际上下载图像的未来永远不会完成,它正在占用我的连接池资源。
我想知道为什么这些设置对我的情况没有任何影响:
编辑2:
显然还不支持超时。这是我的错误报告 https://github.com/akka/akka/issues/17732#issuecomment-112315953
akka - 不允许使用 Akka Stream OnNext
我只是按照 akka 流 ActorPublisher 示例进行操作,有时我收到了以下消息:
java.lang.IllegalStateException:当流没有请求元素时不允许 onNext,totalDemand 为 0
查看文档,他们解释说:
通过调用 onNext 将元素发送到流。您可以发送流订阅者请求的尽可能多的元素。这个金额可以通过totalDemand查询。只有在isActive且totalDemand>0时才允许使用onNext,否则onNext会抛出IllegalStateException。
当流订阅者请求更多元素时,ActorPublisherMessage.Request 消息将传递给该参与者,您可以对该事件采取行动。totalDemand 会自动更新。
如何防止 totalDemand 为零?当我收到此错误时,我丢失了我试图发送的消息。
这是我一直在关注的示例:
http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-RC3/scala/stream-integrations.html
这是我的课堂测试
好吧,我收到了来自 kafka 的消息,并且我正在传递给 WorkerActor,但是当向 Kafka 发送大约 10 条消息/秒时,其中一些消息由于这个错误而丢失了。
更新
我遇到了这里描述的错误(使用相同的库):
https://github.com/softwaremill/reactive-kafka/issues/11
我使用缓冲区解决了我的问题,但看起来这个 PR 可以解决问题。
java - 如何响应参与者调用的结果?
我们正在研究使用 Akka-HTTP Java API - 使用路由 DSL。
不清楚如何使用路由功能来响应 HttpRequest;使用无类型的 Akka Actor。例如,在匹配 Route 路径时,我们如何将请求交给“处理程序”ActorRef,然后它会以异步方式以 HttpResponse 响应?
Akka-User 邮件列表上发布了一个类似的问题,但没有类似的后续解决方案 - https://groups.google.com/d/msg/akka-user/qHe3Ko7EVvg/KC-aKz_o5aoJ。
scala - 如何创建一个可以稍后通过方法调用接收元素的 Source?
我想在Source
其上创建一个稍后推送元素,例如:
推荐的方法是什么?
谢谢!
scala - 我可以实施自己的溢出策略吗?
是否有可能(或者将来可能)将我自己的OverflowStrategy实现为元素当前缓冲区的函数?还是有特定的理由不允许这样做?
谢谢!
java - Akka Stream 流描述中的直播资源
akka-stream 文档中有此注释,说明如下:
... 可重用的流描述不能绑定到“实时”资源,任何与此类资源的连接或分配都必须推迟到实现时间。“实时”资源的示例是已经存在的 TCP 连接、多播发布者等;…
关于笔记我有几个问题:
- 除了这两个例子,还有什么资源可以算作live?
- 任何不能安全(深度)复制的东西?像一个
Thread
? - 我是否也应该避免共享任何不是线程安全的东西?
- 任何不能安全(深度)复制的东西?像一个
- 使用的
ActorRef
现有的呢?ActorSystem
ActorFlowMaterializer
- 如何将分配推迟到实现时间?例如在 a 的构造函数中分配它
PushPullStage
而不是在 a 的 create 函数中分配它是否安全FlowGraph
?