0
class ActorPubSub extends ActorSubscriber with ActorPublisher[Int] {
  var events = Seq.empty[Int]

  override protected def requestStrategy: RequestStrategy = OneByOneRequestStrategy
  override def receive: Actor.Receive = {
    case OnNext(e: Int) => events = e +: events
    case Request(cnt) => events.take(cnt.toInt).foreach(onNext)
  }
}

val pubsubRef = system.actorOf(Props(new ActorPubSub))
val pub = ActorPublisher[Int](pubsubRef)
val sub = ActorSubscriber[Int](pubsubRef)
val pubsubFlow = Flow(Sink(sub), Source(pub))

FlowGraph { implicit b =>
  import akka.stream.scaladsl.FlowGraphImplicits._

  Source((1 to 10).toList) ~> pubsubFlow ~> Sink.foreach[Int](e =>
    println("Got a number " + e)
  )
}.run()

根据Flow.apply(Sink, Source)文档:

从看似断开的 Source 和 Sink 对创建流。

如果这是真的,为什么图仍然没有连接?

4

1 回答 1

0

Endre@akka 用户

这是一个已知问题,将在 M4 中完全修复。

于 2015-02-19T08:07:16.343 回答