使用 scala 2.11.7、rxscala_2.11 0.25.0、rxjava 1.0.16,我的oddFutures
回调不会被调用AsyncDisjointedChunkMultiprocessing.process()
:
package jj.async
import scala.concurrent.Future
import scala.concurrent.ExecutionContext
import rx.lang.scala.Observable
import jj.async.helpers._
/* Problem: How to multi-process records asynchronously in chunks.
Processing steps:
- fetch finite # of records from a repository (10 at-a-time (<= 10 for last batch) because of downstream limitations)
- process ea. chunk through a filter asynchronously (has 10-record input limit)
- compute the reverse of the filtered result
- enrich (also has 10-record input limit) filtered results asynchronously
- return enriched filtered results once all records are processed
*/
object AsyncDisjointedChunkMultiprocessing {
private implicit val ec = ExecutionContext.global
def process(): List[Enriched] = {
@volatile var oddsBuffer = Set[Int]()
@volatile var enrichedFutures = Observable just Set[Enriched]()
oddFutures.foreach(
odds =>
if (odds.size + oddsBuffer.size >= chunkSize) {
val chunkReach = chunkSize - oddsBuffer.size
val poors = oddsBuffer ++ odds take chunkReach
enrichedFutures = enrichedFutures + poors
oddsBuffer = odds drop chunkReach
} else {
oddsBuffer ++= odds
},
error => throw error,
() => enrichedFutures + oddsBuffer)
enrichedFutures.toBlocking.toList.flatten
}
private def oddFutures: Observable[Set[Int]] =
Repository.query(chunkSize) { chunk =>
evenFuture(chunk) map {
filtered => chunk -- filtered
}
}
private def evenFuture(chunk: Set[Int]): Future[Set[Int]] = {
checkSizeLimit(chunk)
Future { Remote even chunk }
}
}
class Enriched(i: Int)
object Enriched {
def apply(i: Int) = new Enriched(i)
def enrich(poors: Set[Int]): Set[Enriched] = {
checkSizeLimit(poors);
Thread.sleep(1000)
poors map { Enriched(_) }
}
}
object Repository {
def query(fetchSize: Int)(f: Set[Int] => Future[Set[Int]]): Observable[Set[Int]] = {
implicit val ec = ExecutionContext.global
Observable.from {
Thread.sleep(20)
f(Set(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
Thread.sleep(20)
f(Set(11, 12, 13, 14, 15, 16, 17, 18, 19, 20))
Thread.sleep(15)
f(Set(21, 22, 23, 24, 25))
}
}
}
package object helpers {
val chunkSize = 10
implicit class EnrichedObservable(enrichedObs: Observable[Set[Enriched]]) {
def +(poors: Set[Int]): Observable[Set[Enriched]] = {
enrichedObs merge Observable.just {
Enriched.enrich(poors)
}
}
}
def checkSizeLimit(set: Set[_ <: Any]) =
if (set.size > chunkSize) throw new IllegalArgumentException(s"$chunkSize-element limit violated: ${set.size}")
}
// unmodifiable
object Remote {
def even = { xs: Set[Int] =>
Thread.sleep(1500)
xs filter { _ % 2 == 0 }
}
}
我创建Observable.from(Future)
in的方式有什么问题Repository.query()
吗?