假设这个 API 是给定的,我们不能改变它:
object ProviderAPI {
trait Receiver[T] {
def receive(entry: T)
def close()
}
def run(r: Receiver[Int]) {
new Thread() {
override def run() {
(0 to 9).foreach { i =>
r.receive(i)
Thread.sleep(100)
}
r.close()
}
}.start()
}
}
在此示例中,ProviderAPI.run
接受 a Receiver
,调用receive(i)
10 次然后关闭。通常,ProviderAPI.run
会receive(i)
根据可能是无限的集合调用。
此 API 旨在以命令式风格使用,例如外部迭代器。如果我们的应用程序需要过滤、映射和打印这个输入,我们需要实现一个接收器来混合所有这些操作:
object Main extends App {
class MyReceiver extends ProviderAPI.Receiver[Int] {
def receive(entry: Int) {
if (entry % 2 == 0) {
println("Entry#" + entry)
}
}
def close() {}
}
ProviderAPI.run(new MyReceiver())
}
现在,问题是如何在函数式风格、内部迭代器中使用 ProviderAPI(不改变提供给我们的 ProviderAPI 的实现)。请注意,ProviderAPI 也可以receive(i)
无限次调用,因此不能将所有内容收集到一个列表中(此外,我们应该逐个处理每个结果,而不是先收集所有输入,然后再处理)。
我在问如何实现这样的ReceiverToIterator
,以便我们可以在功能样式中使用 ProviderAPI:
object Main extends App {
val iterator = new ReceiverToIterator[Int] // how to implement this?
ProviderAPI.run(iterator)
iterator
.view
.filter(_ % 2 == 0)
.map("Entry#" + _)
.foreach(println)
}
更新
这里有四种解决方案:
IteratorWithSemaphorSolution:我首先提出的解决方案附在问题上
QueueIteratorSolution : 使用
BlockingQueue[Option[T]]
基于 nadavwr 的建议。queueCapacity
它允许生产者在被消费者阻止之前继续生产。PublishSubjectSolution:非常简单的解决方案,使用
PublishSubject
来自 Netflix RxJava-Scala API。SameThreadReceiverToTraversable:非常简单的解决方案,通过放宽问题的约束