问题标签 [akka-stream]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
0 回答
1065 浏览

scala - 如何关闭 Akka-stream Tcp 服务器中的连接?

在我们基于 akka-streams 的 Tcp 服务器中,close() 的预期行为是客户端应该立即看到 SocketException,并且在异常之后发布的任何消息都不应该传递到服务器。但是,必须传递异常之前发送的最后一条消息。所有新客户端都应该遇到 ConnectException。

无法破译akka-streams 1.0 文档中关于“关闭连接”的段落,我们正在为 Tcp 服务器使用如下实现 close():

这是关闭服务器的好方法吗?(用于部署等)

如果有人可以审查和评论上述 close() 实现的有效性。我们试图证明或反驳自己此代码是否满足所有要求,并想知道我们是否应该首先成功流的源,或者先解除绑定 serverBinding ?非常感谢任何评论/批评。提前致谢 !

0 投票
2 回答
648 浏览

scala - Akka Streams 是否利用 Akka Actors?

我开始学习Akka Streams,这是一个用于处理具有背压功能的数据的框架。该库是Akka的一部分,它把自己描述为:

Akka 是一个工具包和运行时,用于在 JVM 上构建高度并发、分布式和弹性的消息驱动应用程序。

这些能力来自 Akka actor 的本质。但是,在我看来,流处理和演员是互不相关的概念。

问题:Akka Streams 是否利用了 Akka Actor 的这些特性?如果是,您能解释一下演员如何帮助流媒体吗?

0 投票
1 回答
568 浏览

akka - Akka-streams:如何将流连接到流?

如果流 A 产生

如何将此出口与在源上映射的 Flow B 入口连接。例如 Flow B 的出口是

0 投票
1 回答
696 浏览

scala - 与 Akka Streams 同步反馈

我想要实现的是用 akka 流实现类似同步反馈循环的东西。

假设你有一个Flow[Int].filter(_ % 5 == 0). 当您将Int's 流广播到此流并直接在其后面压缩元组时,您会得到类似

有没有办法发出一个Option[Int],它指示在我推动下一个元素通过它之后流是否发出一个元素?

我想过实现我自己DetachedStage的右前后后Flow保持一个状态,每当流量拉到之前的舞台上时,我就知道他需要下一个元素。当后面的舞台没有收到元素时,它是None。

不幸的是,结果并不好,并且被许多职位所淘汰。

旁注

过滤器流只是一个例子,它可以是一个非常长的流,我无法提供Option在它的每个阶段发出的能力,所以我真的必须知道,流是否推动了下一个或没有而是从下游请求下一个

我也玩过conflateand expand,但这些结果的位置偏移更糟

我在配置中更改的一件事是流的initial缓冲区max,因此我可以确定指示的需求确实是在我推动它的元素之后。

很高兴获得有关如何解决此问题的一些建议!

0 投票
1 回答
1254 浏览

scala - Akka-stream:在 mapAsync 完成处理数据之前调用 onComplete

我目前正在研究一个简单的批处理,它使用 AKKA 流 1.0 来处理数据。如果我避免在流程步骤中使用 mapAsync 方法,一切都会顺利进行。

当 on complete 被调用时,结果文件被最终确定,并且代理系统通过 Reaper actor 关闭(参见 Reaper 模式):

我要加速的步骤之一是丰富数据的部分。有时,数据无法丰富,应该在下一步忽略。

所有这些代码都运行良好,并且在调用 onComplete() 时我没有丢失元素。

当我尝试使用 mapAync 和 Future 而不是 map 进行丰富步骤来加速事物时,会在处理所有元素之前调用 onComplete。

我最后错过了一些元素,而且数字永远不会相同所有这些代码运行良好,并且在调用 onComplete() 时我没有丢失元素。

我找不到方法来发现所有内容都已处理...知道我做错了什么吗?

0 投票
1 回答
63 浏览

akka - Akka 流和 Http。1.0-RC4 版本中的外部 Http 请求

尝试创建 http 客户端/流时,此代码似乎不再起作用:

flow方法似乎不再可用。它真的被弃用了吗?以下是现在创建 Http 连接的首选方式吗?

0 投票
0 回答
608 浏览

scala - 如何配置 akka-http 在请求超时的情况下返回 HTTP 503

我有一个使用 akka-http(1.0 版)实现的 Web 应用程序。它适用于少量(少于 40 个)并发用户。但是,当用户数量达到 40 时,我始终在日志中看到以下错误:

'您尝试订阅的发布者 (akka.stream.impl.MultiStreamOutputProcessor$SubstreamOutput@6d094473) 已被关闭,因为超过了订阅超时。

返回的响应是 HTTP 500 Internal server error。

使用什么配置属性来控制此超时?

是否可以将 akka-http 配置为在超时时自动返回 HTTP 503?

0 投票
1 回答
499 浏览

scala - 在 akka-http 中使用 handleWebsocketMessage

使用akka-http,我怎样才能构造一个只监听传入数据但不写回任何东西的Flow[Message, Message, _] 传递给它?handleWebsocketMessage无论如何都可以使用接收器吗?因为 Sink 听起来像是我需要的。

0 投票
1 回答
1079 浏览

scala - Akka-Http + Twitter streaming API

I recently got an interest into trying out some of the streaming capabilities in the Scala world. This happened while I was reading up on the Iteratee API in Play 2.

However, people seem to think the Iteratee API is close to deprecation, and have recommended me one of the following libs:

  • scalaz-stream
  • akka-streams, or more specifically, akka-http

I didn't really feel like getting into the scalaz world, so I decided to check out akka-http.

Unfortunately, documentation seems to be very sparse on the subject of akka-http at the moment, and I'm having a lot of trouble getting everything to work.

As per usual, I chose everyone's favourite source of streaming data to play around with: Twitter.

Googling the subject mostly leads to either

  • Matthias Nehlsens's excellent work on his BirdWatch project. Unfortunately, he still uses the Iteratees.
  • People using akka-streams with a Twitter client, but I'm not very fond of using those, as I won't be really learning much from those.

Performing a basic GET request with akka-http seems to be structured somewhat like this:

The code above works (though parsing the body of the results is pretty annoying for some reason).

When I try to do the same but pointing to the /1.1/statuses/sample.json endpoint (which should emit an example stream), my Future just sits there and times out. While this may seem logical given the streaming nature of the data, this should instead return a 404, because at this point I'm not even doing proper OAuth.

For reference, this is my code so far at this point:

Like I said, I figured that maybe the streaming nature of this might be causing the problem regardless, so I tried to change my code to handle the chunks one by one based on the only examples I could find, as shown here:

This causes the application to terminate immediately. Remove the .onComplete clause keeps the ActorSystem and the application running, but nothing seems to actually happen :'(

Does anyone have any experience with this? The library has been one huge headache so far. Should I go back to Play + WS + Iteratees?

0 投票
1 回答
2389 浏览

scala - 如何为 Http().bindAndHandle 组合不同的路由?

我的应用程序结构看起来像

EmailService.scala

MonitorStatusService.scala

服务器.scala

我需要的? 我想绑定多条路线,例如

但是当我这样做时,我看到错误为

在此处输入图像描述

我该怎么办?