0

我有下面的代码,我使用可变列表缓冲区来存储从 kafka consumer 收到的文件,然后当列表大小达到 15 时,我将它们插入到 cassandra 中。但是他们有什么方法可以使用不可变列表来做同样的事情。

  val filesList = ListBuffer[SystemTextFile]()
  storeservSparkService.configFilesTopicInBatch.subscribe.atLeastOnce(Flow[SystemTextFile].mapAsync(4) { file: SystemTextFile =>
    filesList += file
    if (filesList.size == 15) {
      storeServSystemRepository.config.insertFileInBatch(filesList.toList)
      filesList.clear()
    }
    Future(Done)
  })
4

2 回答 2

3

这些方面的东西?

Flow[SystemTextFile].grouped(15).mapAsync(4){ files =>
  storeServSystemRepository.config.insertFileInBatch(files)
}
于 2017-04-17T13:30:55.243 回答
0

您是否尝试过使用矢量?

      val filesList = Vector[SystemTextFile]()
      storeservSparkService.configFilesTopicInBatch.subscribe.
          atLeastOnce(Flow[SystemTextFile].mapAsync(4) { file: SystemTextFile =>
       filesList = filesList :+ file
       if (filesList.length == 15) {
            storeServSystemRepository.config.insertFileInBatch(filesList.toList)
       }
       Future(Done)
     })
于 2017-04-17T12:57:27.760 回答