我正在使用 PlayFramework 2.5.3 并想akka.stream.scaladsl.Source
从一个akka.event.EventStream
(事件流是演员系统的一部分)创建一个。事件流会产生某种类型的事件,因此我需要订阅该某种类型的事件并使用play.api.mvc.Results.chunked
. Source
有没有什么简单的方法可以使用 Akka Streams 2.4.5创建这样的?
问问题
719 次
1 回答
5
您可以Source.actorRef
与订阅一起使用。Source.actorRef
是一个具体化为 的源ActorRef
,因此您可以这样做:
// choose the buffer size of the actor source and how the actor
// will react to its overflow
val eventListenerSource = Source.actorRef[YourEventType](32, OverflowStrategy.dropHead)
// run the stream and obtain all materialized values
val (eventListener, ...) = eventListenerSource
.viaMat(...)(Keep.left)
<...>
.run()
// subscribe the source actor to the stream
actorSystem.eventStream.subscribe(eventListener, classOf[YourEventType])
// now events emitted by the source will go to the actor
// and through it to the stream
请注意,actorRef
source 有一定的限制,例如,它的内部缓冲区自然不支持背压溢出策略。您可能希望Source.actorPublisher
与扩展ActorPublisher[YourEventType]
特征的演员一起使用,它会给您更多的控制权。然而,由于EventStream
它是一个纯粹的基于推送的源,你将无法ActorPublisher
比 with做更多的事情Source.actorRef
,所以你不妨使用更简单的方法。
于 2016-05-19T15:58:32.260 回答