我目前正在研究一个简单的批处理,它使用 AKKA 流 1.0 来处理数据。如果我避免在流程步骤中使用 mapAsync 方法,一切都会顺利进行。
当 on complete 被调用时,结果文件被最终确定,并且代理系统通过 Reaper actor 关闭(参见 Reaper 模式):
val file = new File(inputFile)
val run: Future[Int] = source(file)
.via(parse)
.via(enrich)
.via(writeEnriched)
.runWith(printProgress)
run.onComplete { result: Try[Int] =>
context.system.log.info(s"Nb elements processed: ${result.get}")
writerActorRef ! FinalizeResults()
}
我要加速的步骤之一是丰富数据的部分。有时,数据无法丰富,应该在下一步忽略。
def enrich(implicit ec: ExecutionContext) : Flow[Data, EnrichedData, Unit]
= Flow[Data].map(enriched.enrich(_)).collect {
case Some(enrichedData) => enrichedData
}
所有这些代码都运行良好,并且在调用 onComplete() 时我没有丢失元素。
Input: 45639
Nb elements processed: 45639
当我尝试使用 mapAync 和 Future 而不是 map 进行丰富步骤来加速事物时,会在处理所有元素之前调用 onComplete。
def enrich(implicit ec: ExecutionContext) : Flow[Data, EnrichedData, Unit]
= Flow[Data].mapAsyncUnordered(8)(data => Future(enricher.enrich(data))).collect {
case Some(enrichedData) => enrichedData
}
我最后错过了一些元素,而且数字永远不会相同所有这些代码运行良好,并且在调用 onComplete() 时我没有丢失元素。
Input: 45639
Nb elements processed: 45628
我找不到方法来发现所有内容都已处理...知道我做错了什么吗?