2

我正在尝试编写一个简单的程序来从 std in 中获取行,解析它们,并将单个记录插入到 postgres 数据库中的每一行。为了测试,我一直在对一个文件运行它cat my_file | java ...

这是代码:

import scala.concurrent.ExecutionContext.Implicits.global
import scala.io.Source
import scala.util.Failure
import scala.util.Success
import slick.driver.PostgresDriver.api._

object LoadToDB extends App {

  val db = Database.forConfig("dev-ingest")

  for (line <- Source.fromInputStream(System.in).getLines()) {
    val record = parse(line)
    val insert = TableQuery[MyTable] += record
    val fut = db.run(insert)

    fut onComplete {
      case Success(x) => {
        System.out.println("Inserted one record: " + record)
      }
      case Failure(e) => {
        e.printStackTrace()
      }
    }

  }
}

理论上,文件中的行数,打印的“插入一条记录”语句的数量,以及数据库中的记录数应该都匹配。但是,它们都是不同的。文件中的行数多于数据库中的记录,并且数据库中的记录多于打印的“插入”语句。

我对 scala/slick 的异步执行模型有点陌生,所以我怀疑我在那里做错了什么。也许当主线程结束时,所有剩余的线程都没有机会完成它们的执行?有没有办法说“等待所有提交的任务完成”?我试过Await.result(db.shutdown(), Duration.Inf)了,但这似乎阻止了任务运行完成,只是立即杀死了它。

4

1 回答 1

3

您的程序在期货完成之前退出。你必须在某个地方阻止它们。

于 2015-06-07T12:45:17.470 回答