4

我将akka流与reactive-rabbit库结合使用来构建一个脚本,将一些信息推送到我本地rabbitmq服务器上的交换中。

一旦信息被推送到队列中,我希望程序自行关闭。然而,Connection保持程序活着,我找不到任何方法Connection或其他如何杀死它的例子。不可避免地,我必须手动终止该进程。

我的代码看起来像这样:

package prototype

import akka.actor.ActorSystem
import akka.stream.FlowMaterializer
import akka.stream.scaladsl.{Sink, Source}
import akka.util.ByteString
import io.scalac.amqp.{Message, Connection}

object PopulateTodoQueue extends App {
  val connection = Connection()

  val message = Message(ByteString("message"))

  val source = Source(List(message))
  val sink = Sink(connection.publishDirectly(queue = "todo"))

  implicit val actorSystem = ActorSystem()
  implicit val materializer = FlowMaterializer()

  (source to sink).run()

  // Quick hack to wait long enough for the message to send
  Thread.sleep(1000)
  actorSystem.shutdown()
}

这是我的 build.sbt 库依赖项中的一个片段:

"com.typesafe.akka"          %%  "akka-actor"               % "2.3.7",
"com.typesafe.akka"          %%  "akka-stream-experimental" % "0.11",
"io.scalac"                  %%  "reactive-rabbit"          % "0.2.1",

这些一次性任务是否有更好的模式 - 例如您将回调传递给的临时连接?我见过的示例中的所有用例都是针对长时间运行的客户端,这些客户端一直运行到用户明确杀死它们为止。

提前致谢!

4

1 回答 1

0

连接特征中有关闭方法

/** Shutdowns underlying connection.
    * Publishers and subscribers are terminated and notified via `onError`.
    * This method waits for all close operations to complete. */
  def shutdown(): Future[Unit]

你可以这样打电话

connection.shutdown()
于 2016-08-26T23:43:17.317 回答