0

有两个表TableATableB

我需要将一些记录从 复制TableATableB。我使用slick-3.0和使用以下方式:

import akka.stream._
import akka.stream.scaladsl._
...

//{{ READ DATA FROM TABLE A
val q = TableA.filter(somePredicate).result
val source = Source.fromPublisher {
      db.stream(q.result).mapResult { r => 
        val record: RecordA = someTransformation(r) 
        record
      }
    }.grouped(50) // grouping because I want to write records in batch mode
//}}

//{{ WRITE DATA TO TABLE B
val f:Future[Done] = source.runWith(Sink.foreach { batch: Seq[RecordA] =>
      //TODO how to write batch to TableB asynchronously?
      val insertAction = TableB ++= batch  // insert batch to table
      val fInsert: Future[_] = db.run(insertAction)
      Await.result(fInsert, ...)           // #1 this works only with blocking
})
//}}

但是我遇到了一个问题 - 如何将批处理写入TableB异步(请参阅 TODO)。现在上面的代码只适用于阻塞到内部未来(见#1评论)。是否有异步执行该任务的正确方法?

4

1 回答 1

2

使用mapAsync它期望返回一个未来,并在下一阶段公开“解包”结果。

source.mapAsync(4){batch: Seq[RecordA] =>
      val insertAction = TableB ++= batch  // insert batch to table
      db.run(insertAction)
}).to(Sink.ignore).run
于 2017-06-08T09:52:02.777 回答