问题标签 [akka-stream]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
2439 浏览

scala - Akka Stream 和 HTTP Scala:如何从路由向 Actor 发送消息

我玩的是akka-stream-and-http-experimental1.0。到目前为止,我有一个可以接受和响应 HTTP 请求的用户服务。我还将有一个可以管理约会的约会服务。为了进行约会,必须是现有用户。如果用户存在,约会服务将与用户服务进行检查。现在这显然可以通过 HTTP 完成,但我宁愿让约会服务向用户服务发送消息。作为新手,我不清楚如何使用演员(作为akka-http抽象)来发送和接收消息。文档中提到了ActorRefand ActorPublisher,但没有前者的例子,后者看起来像我需要的过度杀伤力。我的代码如下所示,位于Github上:

编辑:我想出了如何发送消息,这可以使用Source.actorRef. 那只会将消息发送到流中。我想做的是让路由处理程序类接收响应。这样,当我创建约会服务时,它的参与者可以调用用户服务参与者并以与我示例中的用户路由处理程序相同的方式接收响应。伪代码:

val src = Source.single(name) \\ How to send this to an actor and get the response

编辑 2:基于@yardena 的回答,我想出了以下内容,但最后一行没有编译。我的演员发布者返回 aFuture我猜它会被包装在 a 中Promise,然后作为 a 传递Future给路由处理程序。

0 投票
1 回答
1594 浏览

scala - Akka 流如何持续实现?

我在 Scala 中使用Akka Streams使用AWS Java SDK从AWS SQS队列进行轮询。我创建了一个ActorPublisher,它以两秒的间隔将消息出列:

在我的应用程序中,我也尝试以 2 秒的间隔运行流程:

但是,当我运行我的应用程序时,我会收到java.util.concurrent.TimeoutException: Futures timed out after [20000 milliseconds]ActorMaterializer.

是否有推荐的方法来持续实现 Akka 流?

0 投票
0 回答
336 浏览

scala - Akka Http 无响应

我有一个响应式 Akka Http 应用程序,做了一些更改,然后收到了这些消息:

我无法弄清楚它们的意思。文档中没有提到它们,也没有谷歌点击。有人可以分享一些见解吗?

0 投票
2 回答
369 浏览

scala - 如何从 Akka Stream Graph 获得物化结果?

我试图弄清楚如何从 scala Akka Stream 图中获得物化结果。

我正在使用"com.typesafe.akka" %% "akka-stream-experimental" % "1.0".

我查看了文档,但找不到任何示例。

所以,假设我有一个代码

我想从图中得到结果,g但它返回 Unit。如何处理?

谢谢你。

0 投票
2 回答
556 浏览

scala - 使用 akka 流对数据库的 TCP 请求

我正在尝试使用 akka-streams 的客户端向数据库发送查询Tcp,但我不明白我缺少什么。

所以我有两种类型QueryResponse它们可以完美地与akka's 相互转换ByteString。所以我正在创建一个与 的客户端连接val conn = Tcp().outgoingConnection("localhost", 28015),这给了我一个Flow[ByteString, ByteString, Future[OutgoingConnection]],到目前为止一切都很好。所以我假设源是我对查询的请求,我找不到用查询源提供这个流的最佳方法,而是像构建它一样Source(Future.successful(query)),并将它连接到流source.via(flow),这给了我另一个Source[Response, Unit]。在这里我无法理解如何获取Future[Response],尝试了几个组合器,但它给了我Materialized价值,我不完全理解它与流程中的值/类型的关系。

0 投票
1 回答
216 浏览

scala - 从 PushPullStage 发出多个对象

我一直在玩 Akka-Streams,我正在尝试Flow通过实现我自己的PushPullStage. 我希望Flow将从上游接收到的对象累积到一个列表中,并在上游完成时根据某些功能将它们分组,然后再将组发送到下游。

实现起来似乎很简单,但我不知道该怎么做!似乎没有办法从PushPullStage.

到目前为止,这是我的实现:

编辑

我更改了代码以考虑背压,现在一切正常。基本上我只需要让下游Flow做他们应该做的事情并继续拉动元素:

0 投票
2 回答
847 浏览

scala - 如何使用异步阶段创建 Akka 流真正的拉流

我正在尝试创建一个提供 OAuth2 令牌并且还负责刷新过期令牌的 Source。目前我的代码看起来有点像这样

此代码的输出如下所示

显然,这个 mapAsync(1) 在不期望的时候产生了需求(预取?)

有2个问题:

  • 需求导致上游不需要的令牌请求
  • 令牌的预取/缓存是有问题的,因为它们仅在特定的时间内有效

那么如何创建一个行为类似于此函数的真正拉流呢?

def tokenSource: () => Future[Token]

0 投票
1 回答
2390 浏览

json - Akka HTTP:如何将 Json 格式响应解组为域对象

我正在尝试 Akka HTTP,并且我创建了一个服务,该服务在 HttpResponse 中返回域对象的 Json 数组。在客户端中,我想将其转换为域对象的源,以便后续流和接收器可以使用它。

参考 Json 支持部分: http ://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/scala/http/common/json-support.html

我已经完成了定义隐式 RootJsonReader 等的必要工作,但我不知道如何使用 FromEntityUnmarshaller。

我的代码在这里: https ://github.com/charlesxucheng/akka-http-microservice

它基于 akka-http-microservice 激活器模板。Service2.scala 是我的服务器实现并且正在运行。AkkaHttpClient.scala 是客户端实现,它是不完整的。

要构建,请使用 Gradle,因为 build.sbt 不是最新的。

谢谢。

0 投票
2 回答
2790 浏览

scala - Akka-HTTP:文件上传

我正在尝试使用 akka http 实现简单的文件上传。我的尝试如下所示:

这段代码有几个问题:

  1. 无法上传大于配置实体大小的文件: Request Content-Length 24090745 exceeds the configured limit of 8388608
  2. 连续执行两次上传会导致dead letters encountered.异常。

克服大小限制的最佳方法是什么?如何正确关闭文件,以便后续上传将覆盖现有文件(暂时忽略并发上传)?

0 投票
1 回答
1342 浏览

scala - Akka/Scala:你能解释一下这个 Akka Streams Flow 中发生了什么吗?

我正在研究 Akka 流,并做了一个斐波那契发布者 - 订阅者示例,如下所示。但是,我还不太明白需求最初是如何产生的,以及它与订阅者的请求策略有什么关系。有人可以解释一下吗?

斐波那契出版社:

斐波那契订阅者:

斐波那契应用程序:

样品运行:问题:最初对 4 的需求是从哪里来的?