9

在我的场景中,客户端发送“再见”websocket 消息,我需要关闭以前在服务器端建立的连接。

来自 akka-http文档

通过取消来自服务器逻辑的传入连接流(例如,通过将其下游连接到 Sink.cancelled 并将其上游连接到 Source.empty),可以关闭连接。也可以通过取消 IncomingConnection 源连接来关闭服务器的套接字。

但考虑到这一点SinkSource在协商新连接时设置一次,我不清楚如何做到这一点:

(get & path("ws")) {
  optionalHeaderValueByType[UpgradeToWebsocket]() {
    case Some(upgrade) ⇒
      val connectionId = UUID()
      complete(upgrade.handleMessagesWithSinkSource(sink, source))
    case None ⇒
      reject(ExpectedWebsocketRequestRejection)
  }
}
4

3 回答 3

5

提示:此答案基于akka-stream-experimentalversion 2.0-M2。API 在其他版本中可能略有不同。


关闭连接的一种简单方法是使用PushStage

import akka.stream.stage._

val closeClient = new PushStage[String, String] {
  override def onPush(elem: String, ctx: Context[String]) = elem match {
    case "goodbye" ⇒
      // println("Connection closed")
      ctx.finish()
    case msg ⇒
      ctx.push(msg)
  }
}

在客户端或服务器端接收到的每个元素(以及通常通过 a 的每个元素Flow)都会通过这样的Stage组件。在 Akka 中,全抽象被称为GraphStage,更多信息可以在官方文档中找到。

使用 aPushStage我们可以观察具体传入元素的值,然后相应地转换上下文。在上面的示例中,一旦goodbye收到消息,我们就会完成上下文,否则我们只需通过push方法转发值。

现在,我们可以closeClient通过该方法将组件连接到任意流transform

val connection = Tcp().outgoingConnection(address, port)

val flow = Flow[ByteString]
  .via(Framing.delimiter(
      ByteString("\n"),
      maximumFrameLength = 256,
      allowTruncation = true))
  .map(_.utf8String)
  .transform(() ⇒ closeClient)
  .map(_ ⇒ StdIn.readLine("> "))
  .map(_ + "\n")
  .map(ByteString(_))

connection.join(flow).run()

上面的流程接收 aByteString并返回 a ByteString,这意味着它可以connection通过该join方法连接。在流程内部,我们首先将字节转换为字符串,然后再将它们发送到closeClient. 如果PushStage没有完成流,则元素在流中被转发,在那里它被丢弃并被来自标准输入的一些输入替换,然后通过线路发送回。如果流完成了,舞台组件之后的所有进一步的流处理步骤都将被删除 - 流现在已关闭。

于 2015-12-18T22:50:13.043 回答
4

这可以通过在当前(2.4.14)版本的akka​​-stream中完成

package com.trackabus.misc

import akka.stream.stage._
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}

// terminates the flow based on a predicate for a message of type T
// if forwardTerminatingMessage is set the message is passed along the flow
// before termination
// if terminate is true the stage is failed, if it is false the stage is completed
class TerminateFlowStage[T](
    pred: T => Boolean, 
    forwardTerminatingMessage: Boolean = false, 
    terminate: Boolean = true)
  extends GraphStage[FlowShape[T, T]]
{
  val in = Inlet[T]("TerminateFlowStage.in")
  val out = Outlet[T]("TerminateFlowStage.out")
  override val shape = FlowShape.of(in, out)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = 
    new GraphStageLogic(shape) {

      setHandlers(in, out, new InHandler with OutHandler {
        override def onPull(): Unit = { pull(in) }

        override def onPush(): Unit = {
          val chunk = grab(in)

          if (pred(chunk)) {
            if (forwardTerminatingMessage)
              push(out, chunk)
            if (terminate)
              failStage(new RuntimeException("Flow terminated by TerminateFlowStage"))
            else
              completeStage()
          }
          else
            push(out, chunk)
        }
      })
  }
}

使用它定义你的阶段

val termOnKillMe = new TerminateFlowStage[Message](_.isInstanceOf[KillMe])

然后将其作为流程的一部分

.via(termOnKillMe)
于 2016-12-04T19:43:19.877 回答
2

另一种方法是使用 Source.queue 中的队列管理连接。队列可用于向客户端发送消息以及关闭连接。

def socketFlow: Flow[Message, Message, NotUsed] = {
  val (queue, source) = Source.queue[Message](5, OverflowStrategy.fail).preMaterialize()

  // receive client message 
  val sink = Sink.foreach[Message] {
    case TextMessage.Strict("goodbye") =>
      queue.complete() // this closes the connection
    case TextMessage.Strict(text) =>
      // send message to client by using offer
      queue.offer(TextMessage(s"you sent $text")) 
  }
  Flow.fromSinkAndSource(sink, source)
}

// you then produce the upgrade response like this
val response = upgrade.handleMessages(socketFlow)

将队列用于 WebSockets 的一个好处是,只要您有权访问它,您就可以随时使用它来发送消息,而不必等待传入的消息回复。

于 2018-06-22T20:09:13.920 回答