2

我使用 Alpakka Cassandra 库编写了这个简单的应用程序

package com.abhi

import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, ClosedShape}
import akka.stream.alpakka.cassandra.scaladsl.CassandraSource
import akka.stream.scaladsl.{Flow, GraphDSL, RunnableGraph, Sink}
import com.datastax.driver.core.{Cluster, Row, SimpleStatement}
import scala.concurrent.Await
import scala.concurrent.duration._

object MyApp extends App {
   implicit val actorSystem = ActorSystem()
   implicit val actorMaterializer = ActorMaterializer()
   implicit val session = Cluster
      .builder
      .addContactPoints(List("localhost") :_*)
      .withPort(9042)
      .withCredentials("foo", "bar")
      .build
      .connect("foobar")
   val stmt = new SimpleStatement("SELECT col1, col2 FROM foo").setFetchSize(20)
   val source = CassandraSource(stmt)
   val toFoo = Flow[Row].map(row => Foo(row.getLong(0), row.Long(1)))
   val sink = Sink.foreach[Foo](foo => println(foo.col1, foo.col2))
   val graph = RunnableGraph.fromGraph(GraphDSL.create(sink){ implicit b =>
      s =>
      import GraphDSL.Implicits._
      source.take(10) ~> toFoo ~> s
      ClosedShape
   })
   // let us run the graph
   val future = graph.run()
   import actorSystem.dispatcher
   future.onComplete{_ =>
      session.close()
      Await.result(actorSystem.terminate(), Duration.Inf)
   }
   Await.result(future, Duration.Inf)
   System.exit(0)
}

case class Foo(col1: Long, col2: Long)

此应用程序完全按预期运行,它在屏幕上打印 10 行。

但是发布它挂起。调用执行时System.exit(0)会抛出异常

Exception: sbt.TrapExitSecurityException thrown from the UncaughtExceptionHandler in thread "run-main-0"

但应用程序仍然没有停止运行。它只是挂起。

我不明白为什么这个应用程序不能正常终止(实际上它甚至不需要 system.exit(0) 调用。

退出此应用程序的唯一方法是通过控件 C。

4

1 回答 1

2

这可能是因为 sbt 在其自己的 JVM 实例中运行您的代码,System.exit然后您将退出 sbt 的 JVM 并给出上述结果。

您是否尝试设置:fork in run := true在您的 sbt 构建中的某个位置?

我也不确定用它actorSystem.dispatcher来执行你的onComplete回调是一个好主意(因为你用它来等待actor系统本身的终止)。

你可以尝试的东西:

import actorSystem.dispatcher
future.onComplete{ _ =>
  session.close()
  actorSystem.terminate()
}
Await.result(actorSystem.whenTerminated, Duration.Inf)

请注意,当剩下的唯一线程是守护线程时,JVM 将退出而无需调用System.exit(参见示例What is Daemon thread in Java?)。

于 2017-09-12T07:08:26.863 回答