6

我正在使用 PlayFramework 2.5.3 并想akka.stream.scaladsl.Source从一个akka.event.EventStream(事件流是演员系统的一部分)创建一个。事件流会产生某种类型的事件,因此我需要订阅该某种类型的事件并使用play.api.mvc.Results.chunked. Source有没有什么简单的方法可以使用 Akka Streams 2.4.5创建这样的?

4

1 回答 1

5

您可以Source.actorRef与订阅一起使用。Source.actorRef是一个具体化为 的源ActorRef,因此您可以这样做:

// choose the buffer size of the actor source and how the actor
// will react to its overflow
val eventListenerSource = Source.actorRef[YourEventType](32, OverflowStrategy.dropHead)

// run the stream and obtain all materialized values
val (eventListener, ...) = eventListenerSource
    .viaMat(...)(Keep.left)
    <...>
    .run()

// subscribe the source actor to the stream
actorSystem.eventStream.subscribe(eventListener, classOf[YourEventType])

// now events emitted by the source will go to the actor
// and through it to the stream

请注意,actorRefsource 有一定的限制,例如,它的内部缓冲区自然不支持背压溢出策略。您可能希望Source.actorPublisher与扩展ActorPublisher[YourEventType]特征的演员一起使用,它会给您更多的控制权。然而,由于EventStream它是一个纯粹的基于推送的源,你将无法ActorPublisher比 with做更多的事情Source.actorRef,所以你不妨使用更简单的方法。

于 2016-05-19T15:58:32.260 回答