5

我正在与我无法控制的 java 库中的数据发布者合作。发布者库使用典型的回调设置;库代码中的某处(库是 java,但为了简洁起见,我将在 scala 中描述):

type DataType = ???

trait DataConsumer {
  def onData(data : DataType) : Unit
}

库的用户需要编写一个实现该onData方法的类并将其传递给 a DataProducer,库代码如下所示:

class DataProducer(consumer : DataConsumer) {...}

DataProducer有自己无法控制的内部线程,以及伴随的数据缓冲区,onData每当有另一个DataType对象要消耗时就会调用它。

所以,我的问题是:我如何编写一个层来将原始库模式转换/翻译成 akka 流Source对象?

先感谢您。

4

2 回答 2

6

回调 --> 源

详细说明 Endre Varga 的答案,下面是创建DataConsumer回调函数的代码,该函数将消息发送到 akka 流中Source

警告:创建一个功能性的 ActorPublish 比我在下面指出的要多得多。特别是,需要进行缓冲以处理DataProduceris 调用onData速度快于Sinkis 信令需求的情况(参见此示例)。下面的代码只是设置“接线”。

import akka.actor.ActorRef
import akka.actor.Actor.noSender

import akka.stream.Actor.ActorPublisher

/**Incomplete ActorPublisher, see the example link.*/
class SourceActor extends ActorPublisher[DataType] {
  def receive : Receive = {
    case message : DataType => deliverBuf() //defined in example link
  }    
}

class ActorConsumer(sourceActor : ActorRef) extends DataConsumer {
  override def onData(data : DataType) = sourceActor.tell(data, noSender)
}

//setup the actor that will feed the stream Source
val sourceActorRef = actorFactory actorOf Props[SourceActor]

//setup the Consumer object that will feed the Actor
val actorConsumer = ActorConsumer(sourceActorRef)

//setup the akka stream Source
val source = Source(ActorPublisher[DataType](sourceActorRef))

//setup the incoming data feed from 3rd party library
val dataProducer  = DataProducer(actorConsumer)

回调 --> 整个流

最初的问题专门要求对 Source 进行回调,但如果整个流已经可用(而不仅仅是 Source),则处理回调更容易处理。那是因为可以ActorRef使用Source#actorRef函数将流具体化为一个。举个例子:

val overflowStrategy = akka.stream.OverflowStrategy.dropHead

val bufferSize = 100

val streamRef = 
  Source
    .actorRef[DataType](bufferSize, overflowStrategy)
    .via(someFlow)
    .to(someSink)
    .run()

val streamConsumer = new DataConsumer {
  override def onData(data : DataType) : Unit = streamRef ! data
} 

val dataProducer = DataProducer(streamConsumer)
于 2015-06-13T15:01:21.210 回答
1

有多种方法可以解决这个问题。一种是使用ActorPublisher:http ://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M5/scala/stream-integrations.html#Integrating_with_Actors ,您可以在其中更改回调,以便它向actor发送消息。根据回调的工作方式,您也可以使用 mapAsync(将回调转换为 Future)。仅当一个请求恰好产生一个回调调用时,这才有效。

于 2015-04-07T10:49:50.560 回答