4

假设这个 API 是给定的,我们不能改变它:

object ProviderAPI {

  trait Receiver[T] {
    def receive(entry: T)
    def close()
  }

  def run(r: Receiver[Int]) {
    new Thread() {
      override def run() {
        (0 to 9).foreach { i =>
          r.receive(i)
          Thread.sleep(100)
        }
        r.close()
      }
    }.start()
  }
}

在此示例中,ProviderAPI.run接受 a Receiver,调用receive(i)10 次然后关闭。通常,ProviderAPI.runreceive(i)根据可能是无限的集合调用。

此 API 旨在以命令式风格使用,例如外部迭代器。如果我们的应用程序需要过滤、映射和打印这个输入,我们需要实现一个接收器来混合所有这些操作:

object Main extends App {
  class MyReceiver extends ProviderAPI.Receiver[Int] {
    def receive(entry: Int) {
      if (entry % 2 == 0) {
        println("Entry#" + entry)
      }
    }
    def close() {}
  }

  ProviderAPI.run(new MyReceiver())
}

现在,问题是如何在函数式风格、内部迭代器中使用 ProviderAPI(不改变提供给我们的 ProviderAPI 的实现)。请注意,ProviderAPI 也可以receive(i)无限次调用,因此不能将所有内容收集到一个列表中(此外,我们应该逐个处理每个结果,而不是先收集所有输入,然后再处理)。

我在问如何实现这样的ReceiverToIterator,以便我们可以在功能样式中使用 ProviderAPI:

object Main extends App {
  val iterator = new ReceiverToIterator[Int]  // how to implement this?
  ProviderAPI.run(iterator)
  iterator
    .view
    .filter(_ % 2 == 0)
    .map("Entry#" + _)
    .foreach(println)
}

更新

这里有四种解决方案:

4

5 回答 5

1

更新:1 个条目的 BlockingQueue

您在这里实现的本质上是 Java 的 BlockingQueue,队列大小为 1。

主要特点:超级阻塞。一个缓慢的消费者会扼杀你的生产者的表现。

更新: @gzm0 提到 BlockingQueue 不包括 EOF。为此,您必须使用 BlockingQueue[Option[T]] 。

更新:这是一个代码片段。它可以制作成适合您的Receiver.
其中一些灵感来自Iterator.buffered. 请注意,这peek是一个具有误导性的名称,因为它可能会阻止 - hasNext.

// fairness enabled -- you probably want to preserve order...
// alternatively, disable fairness and increase buffer to be 'big enough'
private val queue = new java.util.concurrent.ArrayBlockingQueue[Option[T]](1, true)

// the following block provides you with a potentially blocking peek operation
// it should `queue.take` when the previous peeked head has been invalidated
// specifically, it will `queue.take` and block when the queue is empty
private var head: Option[T] = _
private var headDefined: Boolean = false
private def invalidateHead() { headDefined = false }
private def peek: Option[T] = {
  if (!headDefined) {
    head = queue.take()
    headDefined = true
  }
  head
}

def iterator = new Iterator[T] {

  // potentially blocking; only false upon taking `None`
  def hasNext = peek.isDefined

  // peeks and invalidates head; throws NoSuchElementException as appropriate
  def next: T = {
    val opt = peek; invalidateHead()
    if (opt.isEmpty) throw new NoSuchElementException
    else opt.get
  }
}

替代方案:迭代者

基于迭代器的解决方案通常会涉及更多的阻塞。从概念上讲,您可以在执行迭代的线程上使用延续来避免阻塞线程,但是延续与 Scala 的 for-comprehensions 混淆,因此在这条路上没有乐趣。

或者,您可以考虑基于迭代的解决方案。迭代器与迭代器不同,消费者不负责推进迭代——生产者负责。使用迭代器,消费者基本上会随着时间的推移折叠生产者推送的条目。在每个下一个条目可用时折叠它可以在线程池中进行,因为在每次折叠完成后线程都会被放弃。

你不会在迭代中得到很好的 for 语法,而且学习曲线有点挑战,但是如果你对使用 a 有信心,foldLeft你最终会得到一个看起来很合理的非阻塞解决方案。

要了解有关 iteratee 的更多信息,我建议查看PlayFramework 2.X 的 iteratee 参考。文档描述了他们独立的 iteratee 库,它在 Play 环境之外 100% 可用。Scalaz 7 也有一个全面的迭代库。

于 2013-05-01T21:49:47.003 回答
0

发布主题解决方案

使用PublishSubjectNetflix RxJava-Scala API 的一个非常简单的解决方案:

// libraryDependencies += "com.netflix.rxjava" % "rxjava-scala" % "0.20.7"

import rx.lang.scala.subjects.PublishSubject

class MyReceiver[T] extends ProviderAPI.Receiver[T] {
  val channel = PublishSubject[T]()
  def receive(entry: T) { channel.onNext(entry) }
  def close() { channel.onCompleted() }
}

object Main extends App {
  val myReceiver = new MyReceiver[Int]()
  ProviderAPI.run(myReceiver)
  myReceiver.channel.filter(_ % 2 == 0).map("Entry#" + _).subscribe{n => println(n)}
}
于 2013-12-05T05:14:03.497 回答
0

ReceiverToTraversable

当我想使用 svnkit.com API 列出和处理 svn 存储库时,出现了这个 stackoverflow 问题,如下所示:

SvnList svnList = new SvnOperationFactory().createList();
svnList.setReceiver(new ISvnObjectReceiver<SVNDirEntry>() {
  public void receive(SvnTarget target, SVNDirEntry dirEntry) throws SVNException {
    // do something with dirEntry
  }
});
svnList.run();

API 使用了回调函数,我想改用函数式样式,如下所示:

svnList.
  .filter(e => "pom.xml".compareToIgnoreCase(e.getName()) == 0)
  .map(_.getURL)
  .map(getMavenArtifact)
  .foreach(insertArtifact)

我想有一个类ReceiverToIterator[T]extends ProviderAPI.Receiver[T]with Iterator[T],但这需要 svnkit api 在另一个线程中运行。这就是为什么我问如何使用在新线程中运行的 ProviderAPI.run 方法来解决这个问题。但这不是很明智:如果我解释了真实案例,之前可能有人已经找到了更好的解决方案

解决方案

如果我们解决真正的问题(因此,不需要为 svnkit 使用线程),一个更简单的解决方案是实现 ascala.collection.Traversable而不是 a scala.collection.Iterator。虽然Iterator需要一个nexthasNextdef,但Traversable需要一个foreachdef,这与svnkit回调非常相似!

请注意,通过使用view,我们使转换器变得懒惰,因此元素通过所有链一一传递到foreach(println)。这允许处理无限的集合。

object ProviderAPI {
  trait Receiver[T] {
    def receive(entry: T)
    def close()
  }

  // Later I found out that I don't need a thread
  def run(r: Receiver[Int]) {
    (0 to 9).foreach { i => r.receive(i); Thread.sleep(100) }
  }
}

object Main extends App {
  new ReceiverToTraversable[Int](r => ProviderAPI.run(r))
    .view
    .filter(_ % 2 == 0)
    .map("Entry#" + _)
    .foreach(println)
}

class ReceiverToTraversable[T](val runProducer: (ProviderAPI.Receiver[T] => Unit)) extends Traversable[T] {
  override def foreach[U](f: (T) => U) = {
    object MyReceiver extends ProviderAPI.Receiver[T] {
      def receive(entry: T) = f(entry)
      def close() = {}
    }
    runProducer(MyReceiver)
  }
}
于 2016-08-05T23:04:50.963 回答
0

QueueIteratorSolution

我提出的第二个解决方法附在这个问题上。我把它移到这里作为答案。

使用BlockingQueue[Option[T]]基于 nadavwr 建议的解决方案。queueCapacity它允许生产者在被消费者阻止之前继续生产。ArrayBlockingQueue我实现了一个使用给定容量的 QueueToIterator 。BlockingQueue 有一个take()方法,但没有peekor hasNext,所以我需要一个OptionNextToIterator如下:

trait OptionNextToIterator[T] extends Iterator[T] {
  def getOptionNext: Option[T]   // abstract
  def hasNext = { ... }
  def next = { ... }
}

注意:我正在使用synchronized里面的块OptionNextToIterator,我不确定它是否完全正确

解决方案:

import java.util.concurrent.ArrayBlockingQueue

object Main extends App {
  val receiverToIterator = new ReceiverToIterator[Int](queueCapacity = 3)
  ProviderAPI.run(receiverToIterator)

  Thread.sleep(3000)  // test that ProviderAPI.run can produce 3 items ahead before being blocked by the consumer
  receiverToIterator.filter(_ % 2 == 0).map("Entry#" + _).foreach(println)
}

class ReceiverToIterator[T](val queueCapacity: Int = 1) extends ProviderAPI.Receiver[T] with QueueToIterator[T] {
  def receive(entry: T) { queuePut(entry) }
  def close() { queueClose() }
}

trait QueueToIterator[T] extends OptionNextToIterator[T] {
  val queueCapacity: Int
  val queue = new ArrayBlockingQueue[Option[T]](queueCapacity)
  var queueClosed = false

  def queuePut(entry: T) {
    if (queueClosed) { throw new IllegalStateException("The queue has already been closed."); }
    queue.put(Some(entry))
  }

  def queueClose() {
    queueClosed = true
    queue.put(None)
  }

  def getOptionNext = queue.take
}

trait OptionNextToIterator[T] extends Iterator[T] {
  def getOptionNext: Option[T]

  var answerReady: Boolean = false
  var eof: Boolean = false
  var element: T = _

  def hasNext = {
    prepareNextAnswerIfNecessary()
    !eof
  }

  def next = {
    prepareNextAnswerIfNecessary()
    if (eof) { throw new NoSuchElementException }
    val retVal = element
    answerReady = false
    retVal
  }

  def prepareNextAnswerIfNecessary() {
    if (answerReady) {
      return
    }
    synchronized {
      getOptionNext match {
        case None => eof = true
        case Some(e) => element = e
      }
      answerReady = true
    }
  }
}
于 2016-08-07T13:44:35.660 回答
0

IteratorWithSemaphorSolution

我提出的第一个解决方案附在这个问题上。我把它移到这里作为答案。

import java.util.concurrent.Semaphore

object Main extends App {
  val iterator = new ReceiverToIterator[Int]
  ProviderAPI.run(iterator)
  iterator
    .filter(_ % 2 == 0)
    .map("Entry#" + _)
    .foreach(println)
}

class ReceiverToIterator[T] extends ProviderAPI.Receiver[T] with Iterator[T] {
  var lastEntry: T = _
  var waitingToReceive = new Semaphore(1)
  var waitingToBeConsumed = new Semaphore(1)
  var eof = false

  waitingToReceive.acquire()

  def receive(entry: T) {
    println("ReceiverToIterator.receive(" + entry + "). START.")
    waitingToBeConsumed.acquire()
    lastEntry = entry
    waitingToReceive.release()
    println("ReceiverToIterator.receive(" + entry + "). END.")
  }

  def close() {
    println("ReceiverToIterator.close().")
    eof = true
    waitingToReceive.release()
  }

  def hasNext = {
    println("ReceiverToIterator.hasNext().START.")
    waitingToReceive.acquire()
    waitingToReceive.release()
    println("ReceiverToIterator.hasNext().END.")
    !eof
  }

  def next = {
    println("ReceiverToIterator.next().START.")
    waitingToReceive.acquire()
    if (eof) { throw new NoSuchElementException }
    val entryToReturn = lastEntry
    waitingToBeConsumed.release()
    println("ReceiverToIterator.next().END.")
    entryToReturn
  }
}
于 2016-08-07T14:37:25.953 回答