回调 --> 源
详细说明 Endre Varga 的答案,下面是创建DataConsumer
回调函数的代码,该函数将消息发送到 akka 流中Source
。
警告:创建一个功能性的 ActorPublish 比我在下面指出的要多得多。特别是,需要进行缓冲以处理DataProducer
is 调用onData
速度快于Sink
is 信令需求的情况(参见此示例)。下面的代码只是设置“接线”。
import akka.actor.ActorRef
import akka.actor.Actor.noSender
import akka.stream.Actor.ActorPublisher
/**Incomplete ActorPublisher, see the example link.*/
class SourceActor extends ActorPublisher[DataType] {
def receive : Receive = {
case message : DataType => deliverBuf() //defined in example link
}
}
class ActorConsumer(sourceActor : ActorRef) extends DataConsumer {
override def onData(data : DataType) = sourceActor.tell(data, noSender)
}
//setup the actor that will feed the stream Source
val sourceActorRef = actorFactory actorOf Props[SourceActor]
//setup the Consumer object that will feed the Actor
val actorConsumer = ActorConsumer(sourceActorRef)
//setup the akka stream Source
val source = Source(ActorPublisher[DataType](sourceActorRef))
//setup the incoming data feed from 3rd party library
val dataProducer = DataProducer(actorConsumer)
回调 --> 整个流
最初的问题专门要求对 Source 进行回调,但如果整个流已经可用(而不仅仅是 Source),则处理回调更容易处理。那是因为可以ActorRef
使用Source#actorRef函数将流具体化为一个。举个例子:
val overflowStrategy = akka.stream.OverflowStrategy.dropHead
val bufferSize = 100
val streamRef =
Source
.actorRef[DataType](bufferSize, overflowStrategy)
.via(someFlow)
.to(someSink)
.run()
val streamConsumer = new DataConsumer {
override def onData(data : DataType) : Unit = streamRef ! data
}
val dataProducer = DataProducer(streamConsumer)