6

我有一个行为的演员:

def receive: Receive = {
    case Info(message) =>
      val res = send("INFO:"  + message)
      installAckHook(res)
    case Warning(message) =>
      val res = send("WARNING:"  + message)
      installAckHook(res)
    case Error(message) =>
      val res = send("ERROR:" + message)
      installAckHook(res)
  }

 private def installAckHook[T](fut: Future[T]): Unit = {
    val answerTo = sender()

    fut.onComplete {
      case Success(_) => answerTo ! "OK"
      case Failure(ex) => answerTo ! ex
    }
  }


  private def send(message: String): Future[HttpResponse] = {
    import context.system
    val payload: Payload = Payload(text = message,
      username = slackConfig.username, icon_url = slackConfig.iconUrl,
      icon_emoji = slackConfig.iconEmoji, channel = slackConfig.channel)
      .validate
    Http().singleRequest(RequestBuilding.Post(slackConfig.hookAddress, payload))
  }

和一个测试

val actorRef = system.actorOf(SlackHookActor.props(SlackEndpointConfig(WebHookUrl,iconEmoji = Some(":ghost:"))))
actorRef ! Error("Some error message")
actorRef ! Warning("Some warning message")
actorRef ! Info("Some info message")
receiveN(3)

afterAll()方法中,我使用TestKit.

它可以工作,请求将其发送到服务器,但是 akka 流部分存在错误:

[ERROR] [06/26/2015 11:34:55.118] [SlackHookTestingSystem-akka.actor.default-dispatcher-10] [ActorSystem(SlackHookTestingSystem)] Outgoing request stream error (akka.stream.AbruptTerminationException)
[ERROR] [06/26/2015 11:34:55.120] [SlackHookTestingSystem-akka.actor.default-dispatcher-13] [ActorSystem(SlackHookTestingSystem)] Outgoing request stream error (akka.stream.AbruptTerminationException)
[ERROR] [06/26/2015 11:34:55.121] [SlackHookTestingSystem-akka.actor.default-dispatcher-8] [ActorSystem(SlackHookTestingSystem)] Outgoing request stream error (akka.stream.AbruptTerminationException)

好像因为我有一个 Future 完成了传出连接应该已经关闭,所以这是一个错误还是我错过了某事?

4

1 回答 1

3

您还需要关闭 http 连接池,例如

Http().shutdownAllConnectionPools().onComplete{ _ =>
  system.shutdown()
}

也许akka http testkit 提供了一些帮助

于 2015-08-26T13:29:14.993 回答