8

这个想法是保持通道打开以供以后使用。在 playframework 2.5.x 中,文档说您必须使用 akka 流,但没有说明如何实现此示例。有人可以帮助我吗?

import play.api.mvc._
import play.api.libs.iteratee._
import play.api.libs.concurrent.Execution.Implicits.defaultContext

def socket =  WebSocket.using[String] { request =>

  // Concurrent.broadcast returns (Enumerator, Concurrent.Channel)
  val (out, channel) = Concurrent.broadcast[String]

  // log the message to stdout and send response back to client
  val in = Iteratee.foreach[String] {
    msg => println(msg)
      // the Enumerator returned by Concurrent.broadcast subscribes to the channel and will
      // receive the pushed messages
      channel push("I received your message: " + msg)
  }
  (in,out)
}
4

3 回答 3

4

你必须做这样的事情!

val (subscriber, publisher)=Source.asSubscriber[String]
      .toMat(Sink.asPublisher[String](fanout = true))(Keep.both).run()

def websocketAction=WebSocket.accept { requestHeader =>
    Flow.fromSinkAndSource(Sink.fromSubscriber(subscriber),Source.fromPublisher(publisher))
}

第一部分将在给定接收器和流的情况下创建推送消息和接收消息(订阅发布者)所需的对象。

最后,您将为使用该代码收到的每个 websocket 请求创建一个流Flow.fromSinkAndSource……关于 Akka 流(Sources、Sinks 和Flows)尚不清楚的是它们代表流的形状,而不是流本身。 ..当您将它们具体化(使用方法runWithrun)时,流程就会发生。现在... Play 接收Sources(使用服务器发送事件时)或Flow使用 WebSockets 时接收 s。而且它们还没有物化......所以你需要物化它们(第一行),然后再创建一个流!(websocketAction 行)

对不起,如果我不够清楚,但是使用该代码,它将起作用。

于 2016-07-19T04:37:29.740 回答
1

我终于找到了使用 Actors 的解决方案。我找到了这个:

def conect = WebSocket.accept[JsValue, JsValue] {request => 
  ActorFlow.actorRef(out => UserWebSocket.props(out, users))
}

然后看了下ActorFlow.actorRef的源码: https ://github.com/playframework/playframework/blob/2.5.0/framework/src/play-streams/src/main/scala/play/api/libs/流/ActorFlow.scala

并想出了这个解决方案:

import javax.inject._
import play.api.Configuration
import play.api.mvc._
import scala.concurrent._

import akka.stream.{Materializer, OverflowStrategy}
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import akka.actor._

class UserActor(out: ActorRef) extends Actor {
  def receive = {
    // receives messages from client browser here
    // out is actor that will send messages back to client(s)
    case msg: String => out ! "Received message "+msg
  }
}
object UserActor {
  def props(out: ActorRef) = Props(new UserActor(out))
}

@Singleton
class NotificationController @Inject()(val config:Configuration)
                          (implicit ec: ExecutionContext, actorSystem:ActorSystem, materializer: Materializer) extends Controller {

  // outActor can be used to send messages to client(s)
  // Sink.asPublisher(true) makes this a broadcast channel (multiple clients can connect to this channel, and messages sent to outActor are broadcast to all of them).  Use Sink.asPublisher(false) to create a unicast channel.
  val (outActor, publisher) = Source.actorRef[String](99, OverflowStrategy.dropNew)
        .toMat(Sink.asPublisher(true))(Keep.both).run()


  def flowsocket = WebSocket.accept[String, String] {request =>
    val aflow:Flow[String, String, _] = {

        val sink = Sink.actorRef( actorSystem.actorOf(UserActor.props(outActor)), akka.actor.Status.Success(()) )

        val source = Source.fromPublisher(publisher)

        Flow.fromSinkAndSource(
            sink, source
        )
    }
    aflow
  }

}

此后,我修改了我的解决方案以更全面地采用 Actor 模型。我现在有一个“UsersBroadcastActor”,它是一个所有其他“UserActor”连接并可以通过它进行通信的单例演员:

lazy val broadcastActorRef = actorSystem.actorOf(Props[UsersBroadcastActor])

def flowsocket = WebSocket.accept[JsValue, JsValue] { request =>
    ActorFlow.actorRef(out => UserActor.props(out, broadcastActorRef))
}

当 UserActor 被实例化时,在它的 preStart() 方法中,它发送一个订阅消息到 broadcastActorRef,它保存对所有“订阅”它的 UserActor 的引用。我可以向broadcastActorRef 发送一条消息,并将其转发给每个UserActor。如果您也想要此解决方案的完整代码示例,请告诉我。

于 2016-03-24T20:34:27.890 回答
-1

我认为您只是在寻找如何使用 Play 2.5 和 Akka Streams 流进行 Echo websocket 连接。

这应该可以解决问题

  def socket = WebSocket.accept[String, String] { request =>
    Flow[String]
      .map(msg => "I received your message: " + msg)
  }
于 2016-03-15T16:44:32.310 回答