2

我目前正在研究一个简单的批处理,它使用 AKKA 流 1.0 来处理数据。如果我避免在流程步骤中使用 mapAsync 方法,一切都会顺利进行。

当 on complete 被调用时,结果文件被最终确定,并且代理系统通过 Reaper actor 关闭(参见 Reaper 模式):

val file = new File(inputFile)
val run: Future[Int] = source(file)
 .via(parse)
 .via(enrich)
 .via(writeEnriched)
 .runWith(printProgress)
run.onComplete { result: Try[Int] =>
 context.system.log.info(s"Nb elements processed: ${result.get}")
 writerActorRef ! FinalizeResults()
}

我要加速的步骤之一是丰富数据的部分。有时,数据无法丰富,应该在下一步忽略。

def enrich(implicit ec: ExecutionContext) : Flow[Data, EnrichedData, Unit]
= Flow[Data].map(enriched.enrich(_)).collect {
 case Some(enrichedData) => enrichedData
}

所有这些代码都运行良好,并且在调用 onComplete() 时我没有丢失元素。

Input: 45639
Nb elements processed: 45639

当我尝试使用 mapAync 和 Future 而不是 map 进行丰富步骤来加速事物时,会在处理所有元素之前调用 onComplete。

def enrich(implicit ec: ExecutionContext) : Flow[Data, EnrichedData, Unit]
   = Flow[Data].mapAsyncUnordered(8)(data => Future(enricher.enrich(data))).collect {
     case Some(enrichedData) => enrichedData
   }

我最后错过了一些元素,而且数字永远不会相同所有这些代码运行良好,并且在调用 onComplete() 时我没有丢失元素。

Input: 45639
Nb elements processed: 45628

我找不到方法来发现所有内容都已处理...知道我做错了什么吗?

4

1 回答 1

2

最后我发现了问题。我的问题不在 akka 流上,而是在不是线程安全的限制检查器上......感谢您的帮助对于那些感兴趣的人,我已经在 github 上上传了一些代码:https ://github.com/PixelDuck/akka-stream -测试

于 2015-08-11T16:08:39.750 回答