0

我有一个被设计用于 akka-io acking 的 Actor,这样它在向上游(到网络)发送消息时会等待 Ack。这个actor是后端异步应用程序的接口。

我想要一个包装层,它允许我将此 Actor 转换为 akka-streams Flow[Incoming, Outgoing, ???],以便它可以与期望这种签名的较新库集成。

(来自上游的消息很少,所以我们不太关心那里的背压,但拥有它并不是一件坏事。)

sealed trait Incoming //... with implementations
sealed trait Outgoing //... with implementations
object Ack

// `upstream` is an akka-io connection actor that will send Ack
// when it writes an Outgoing message to the socket
class SimpleActor(upstream: Actor) extends Actor {
  def receive = {
    case in: Incoming if sender() == upstream =>
       // does some work in response to upstream
    case other =>
       // does some work in response to downstream
       // including sending messages to upstream and
       // `becoming` a stashing state waiting for Ack
       // to `unbecome`, then sending Ack downstream
       // (which will respect the backpressure).
  }
}

我从 akka-user 邮件列表中得到了很好的授权,在 akka-streams 中没有将 Actor 与流集成的代码,并且为了将 Actor 插入 Stream 并保留基于 Ack 的背压,必须实现 推拉阶段

看来我们实际上需要两个PushPullStages ......一个 forupstream => SimpleActor和一个 for SimpleActor => upstream

我的问题是:

  1. 是否有任何库提供诸如演员和流之间的集成?
  2. PushPullStage有没有比从头开始实现双向更简单的方法?
  3. 是否有任何现有的测试框架可以允许对这样的实现进行压力测试?
4

2 回答 2

5

我认为 akka-stream 的理念是提供低级砖块并在其之上构建高级工具。如果您查看我们最近发布的开源库https://github.com/MfgLabs/akka-stream-extensions,您会发现我们确实做到了。我们提供了一些有用的结构,使管理速率限制器、状态处理器、惰性和生成器等变得更容易......对于actor集成,我认为应该可以创建某种帮助器,以便更容易地将actor与akka集成-流试图传播背压。Akka-Stream 还很年轻,生态系统还在不断发展;)

于 2015-05-31T10:49:52.540 回答
2

是的,您可以将演员与流集成。
为此目的有特殊的演员:演员发布者和演员订阅者。

都在这里:http ://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-RC3/scala/stream-integrations.html

当然,您必须以这样的方式编写演员,使其与流背压一起工作。但是您不需要推拉阶段。

于 2015-05-27T14:00:08.527 回答