我有一个被设计用于 akka-io acking 的 Actor,这样它在向上游(到网络)发送消息时会等待 Ack。这个actor是后端异步应用程序的接口。
我想要一个包装层,它允许我将此 Actor 转换为 akka-streams Flow[Incoming, Outgoing, ???]
,以便它可以与期望这种签名的较新库集成。
(来自上游的消息很少,所以我们不太关心那里的背压,但拥有它并不是一件坏事。)
sealed trait Incoming //... with implementations
sealed trait Outgoing //... with implementations
object Ack
// `upstream` is an akka-io connection actor that will send Ack
// when it writes an Outgoing message to the socket
class SimpleActor(upstream: Actor) extends Actor {
def receive = {
case in: Incoming if sender() == upstream =>
// does some work in response to upstream
case other =>
// does some work in response to downstream
// including sending messages to upstream and
// `becoming` a stashing state waiting for Ack
// to `unbecome`, then sending Ack downstream
// (which will respect the backpressure).
}
}
我从 akka-user 邮件列表中得到了很好的授权,在 akka-streams 中没有将 Actor 与流集成的代码,并且为了将 Actor 插入 Stream 并保留基于 Ack 的背压,必须实现 推拉阶段。
看来我们实际上需要两个PushPullStage
s ......一个 forupstream => SimpleActor
和一个 for SimpleActor => upstream
。
我的问题是:
- 是否有任何库提供诸如演员和流之间的集成?
PushPullStage
有没有比从头开始实现双向更简单的方法?- 是否有任何现有的测试框架可以允许对这样的实现进行压力测试?