尽管您可以使用 aSink.foreach
来实现这一点(如 Ramon 所述),但使用mapAsync
Flow
. 使用时您将面临的问题Sink.foreach
是它没有返回值。db.run
通过 slicks方法插入数据库Future
会返回 a 然后从返回的蒸汽中逃脱,Future[Done]
一旦完成,该蒸汽就会Sink.foreach
完成。
implicit val system = ActorSystem("system")
implicit val materializer = ActorMaterializer()
class Numbers(tag: Tag) extends Table[Int](tag, "NumberTable") {
def value = column[Int]("value")
def * = value
}
val numbers = TableQuery[Numbers]
val db = Database.forConfig("postgres")
Await.result(db.run(numbers.schema.create), Duration.Inf)
val streamFuture: Future[Done] = Source(0 to 100)
.runWith(Sink.foreach[Int] { (i: Int) =>
db.run(numbers += i).foreach(_ => println(s"stream 1 insert $i done"))
})
Await.result(streamFuture, Duration.Inf)
println("stream 1 done")
//// sample 1 output: ////
// stream 1 insert 1 done
// ...
// stream 1 insert 99 done
// stream 1 done <-- stream Future[Done] returned before inserts finished
// stream 1 insert 100 done
另一方面,def mapAsync[T](parallelism: Int)(f: Out ⇒ Future[T])
Flow
允许您通过并行度参数并行运行插入,并接受从上游输出值到某种类型的未来的函数。这符合我们的i => db.run(numbers += i)
功能。这样做的Flow
好处是,它随后将这些结果提供给Futures
下游。
val streamFuture2: Future[Done] = Source(0 to 100)
.mapAsync(1) { (i: Int) =>
db.run(numbers += i).map { r => println(s"stream 2 insert $i done"); r }
}
.runWith(Sink.ignore)
Await.result(streamFuture2, Duration.Inf)
println("stream 2 done")
//// sample 2 output: ////
// stream 2 insert 1 done
// ...
// stream 2 insert 100 done
// stream 1 done <-- stream Future[Done] returned after inserts finished
为了证明这一点,您甚至可以从流中返回一个真实的结果,而不是一个Future[Done]
(With Done 表示 Unit)。此流还将添加更高的并行度值和批处理以获得额外的性能。*
val streamFuture3: Future[Int] = Source(0 to 100)
.via(Flow[Int].grouped(10)) // Batch in size 10
.mapAsync(2)((ints: Seq[Int]) => db.run(numbers ++= ints).map(_.getOrElse(0))) // Insert batches in parallel, return insert count
.runWith(Sink.fold(0)(_+_)) // count all inserts and return total
val rowsInserted = Await.result(streamFuture3, Duration.Inf)
println(s"stream 3 done, inserted $rowsInserted rows")
// sample 3 output:
// stream 3 done, inserted 101 rows
- 注意:对于这么小的数据集,您可能不会看到更好的性能,但是当我处理 1.7M 插入时,我能够在批量大小为 1000 且并行度值为 8 的机器上获得最佳性能,在本地使用 postgresql。这大约是不并行运行的两倍。与往常一样,在处理性能时,您的结果可能会有所不同,您应该自己衡量。