3

我正在尝试使用 Source.actorRef 向演员 bindet 发送消息,但这部分代码:

println(s"Before mapping $src")
src.mapMaterializedValue { ref =>
  println(s"Mapping $ref")
  ref ! letter.text
}
println(s"After mapping $src")

只打印这样的东西:

映射前 Source(SourceShape(ActorRefSource.out), ActorRefSource(0, Fail) [5564f412])
映射后 Source(SourceShape(ActorRefSource.out), ActorRefSource(0, Fail) [5564f412])

所以。不知何故mapMaterializedValue不做任何事情。肯定没有消息发送给演员。是ref - None出于某种原因吗?

此外,我发布了所有代码。它是 websockets 上的简单信使(一对一消息)之类的情节。我现在只是研究 Akka 流,所以这段代码真的不完美。我准备好听取任何批评或建议。

主服务器对象:

package treplol.server

import treplol.common._

import akka.actor.{ActorRef, ActorSystem}
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.ws._
import akka.http.scaladsl.server.Directives._
import akka.stream.scaladsl._
import akka.stream.{ActorMaterializer, FlowShape, OverflowStrategy}

import scala.io.StdIn
import java.util.UUID

object WsServer extends App {

  implicit val system = ActorSystem("example")
  implicit val materializer = ActorMaterializer()

  def createSource(uuid: UUID): Source[String, ActorRef] = {
    val src = Source.actorRef[String](0, OverflowStrategy.fail)
    sources(uuid) = src
    src
  }

  val sources: collection.mutable.HashMap[UUID, Source[String, ActorRef]] =
    collection.mutable.HashMap[UUID, Source[String, ActorRef]]()
  val userSources: collection.mutable.HashMap[String, UUID] =
    collection.mutable.HashMap[String, UUID]()

  def flow: Flow[Message, Message, Any] = {

    val uuid: UUID = UUID.randomUUID()
    val incomingSource: Source[String, ActorRef] = createSource(uuid)

    Flow.fromGraph(GraphDSL.create() { implicit b =>
      import GraphDSL.Implicits._

      val merge = b.add(Merge[String](2))

      val mapMsgToLttr = b.add(
        Flow[Message].collect { case TextMessage.Strict(txt) => txt }
          .map[Letter] { txt =>
            WsSerializer.decode(txt) match {
              case Auth(from) =>
                userSources(from) = uuid
                Letter("0", from, "Authorized!")
              case ltr: Letter => ltr
            }
          }
      )

      val processLttr = b.add(
        Flow[Letter].map[String] { letter =>
          userSources.get(letter.to) flatMap sources.get match {
            case Some(src) =>
              println(s"Before mapping $src")
              src.mapMaterializedValue { ref =>
                println(s"Mapping $ref")
                ref ! letter.text
              }
              println(s"After mapping $src")
              ""
            case None => "Not authorized!"
          }
        }
      )

      val mapStrToMsg = b.add(
        Flow[String].map[TextMessage] (str => TextMessage.Strict(str))
      )

      mapMsgToLttr ~> processLttr ~> merge
                   incomingSource ~> merge ~> mapStrToMsg

      FlowShape(mapMsgToLttr.in, mapStrToMsg.out)
    })

  }

  val route = path("ws")(handleWebSocketMessages(flow))
  val bindingFuture = Http().bindAndHandle(route, "localhost", 8080)

  println(s"Server online at http://localhost:8080/\nPress RETURN to stop...")
  StdIn.readLine()

  import system.dispatcher
  bindingFuture
    .flatMap(_.unbind())
    .onComplete(_ => system.terminate())
}

常用包:

package treplol

package object common {

  trait WsMessage
  case class Letter(from: String, to: String, text: String) extends WsMessage
  case class Auth(from: String) extends WsMessage

  object WsSerializer {

    import org.json4s.{Extraction, _}
    import org.json4s.jackson.JsonMethods.{compact, parse}
    import org.json4s.jackson.Serialization

    implicit val formats = {
      Serialization.formats(NoTypeHints)
    }

    case class WsData(typeOf: String, data: String)
    object WsDataType {
      val LETTER  = "letter"
      val AUTH    = "auth"
    }

    class WrongIncomingData extends Throwable

    def decode(wsJson: String): WsMessage = parse(wsJson).extract[WsData] match {
      case WsData(WsDataType.LETTER, data) => parse(data).extract[Letter]
      case WsData(WsDataType.AUTH, data) => parse(data).extract[Auth]
      case _ => throw new WrongIncomingData
    }

    def encode(wsMessage: WsMessage): String = {
      val typeOf = wsMessage match {
        case _: Letter => WsDataType.LETTER
        case _: Auth => WsDataType.AUTH
        case _ => throw new WrongIncomingData
      }
      compact(Extraction.decompose(
        WsData(typeOf, compact(Extraction.decompose(wsMessage)))
      ))
    }
  }

}

构建.sbt

name := "treplol"

version := "0.0"

scalaVersion := "2.12.1"

resolvers += "Typesafe Releases" at "http://repo.typesafe.com/typesafe/releases"

libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-actor" % "2.4.16",
  "com.typesafe.akka" %% "akka-stream" % "2.4.16",
  "com.typesafe.akka" %% "akka-http" % "10.0.3",
  "org.json4s" %% "json4s-jackson" % "3.5.0"
)

谢谢大家!

4

2 回答 2

4

根据文档,mapMaterializedValue组合器

仅转换此 Source 的物化值,保留所有其他属性。

物化值仅在任何图形阶段(在本例中为源)运行后才可用。你永远不会在你的代码中运行你的源代码。

请注意,Flow[Message, Message, Any]您用来处理 WebSocket 消息的实际上是由 Akka-HTTP 基础架构运行的,因此您不需要手动执行此操作。但是,Source您在主体中创建的processLttr不会附加到图形的其余部分,因此不会运行。

有关运行图和具体化的更多信息,请参阅文档

于 2017-02-08T21:41:52.407 回答
0

感谢斯特凡诺!

但似乎没有办法用这种方式实现我想要的。但我更深入地挖掘并使用了自定义流处理和与 actor 的集成。使用这种技术,我可以将消息从外部推送到某个流。(此功能仍处于试验阶段!)

于 2017-02-12T17:54:47.753 回答