我正在尝试编写一个简单的程序来从 std in 中获取行,解析它们,并将单个记录插入到 postgres 数据库中的每一行。为了测试,我一直在对一个文件运行它cat my_file | java ...
这是代码:
import scala.concurrent.ExecutionContext.Implicits.global
import scala.io.Source
import scala.util.Failure
import scala.util.Success
import slick.driver.PostgresDriver.api._
object LoadToDB extends App {
val db = Database.forConfig("dev-ingest")
for (line <- Source.fromInputStream(System.in).getLines()) {
val record = parse(line)
val insert = TableQuery[MyTable] += record
val fut = db.run(insert)
fut onComplete {
case Success(x) => {
System.out.println("Inserted one record: " + record)
}
case Failure(e) => {
e.printStackTrace()
}
}
}
}
理论上,文件中的行数,打印的“插入一条记录”语句的数量,以及数据库中的记录数应该都匹配。但是,它们都是不同的。文件中的行数多于数据库中的记录,并且数据库中的记录多于打印的“插入”语句。
我对 scala/slick 的异步执行模型有点陌生,所以我怀疑我在那里做错了什么。也许当主线程结束时,所有剩余的线程都没有机会完成它们的执行?有没有办法说“等待所有提交的任务完成”?我试过Await.result(db.shutdown(), Duration.Inf)
了,但这似乎阻止了任务运行完成,只是立即杀死了它。