我终于找到了使用 Actors 的解决方案。我找到了这个:
def conect = WebSocket.accept[JsValue, JsValue] {request =>
ActorFlow.actorRef(out => UserWebSocket.props(out, users))
}
然后看了下ActorFlow.actorRef的源码:
https ://github.com/playframework/playframework/blob/2.5.0/framework/src/play-streams/src/main/scala/play/api/libs/流/ActorFlow.scala
并想出了这个解决方案:
import javax.inject._
import play.api.Configuration
import play.api.mvc._
import scala.concurrent._
import akka.stream.{Materializer, OverflowStrategy}
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import akka.actor._
class UserActor(out: ActorRef) extends Actor {
def receive = {
// receives messages from client browser here
// out is actor that will send messages back to client(s)
case msg: String => out ! "Received message "+msg
}
}
object UserActor {
def props(out: ActorRef) = Props(new UserActor(out))
}
@Singleton
class NotificationController @Inject()(val config:Configuration)
(implicit ec: ExecutionContext, actorSystem:ActorSystem, materializer: Materializer) extends Controller {
// outActor can be used to send messages to client(s)
// Sink.asPublisher(true) makes this a broadcast channel (multiple clients can connect to this channel, and messages sent to outActor are broadcast to all of them). Use Sink.asPublisher(false) to create a unicast channel.
val (outActor, publisher) = Source.actorRef[String](99, OverflowStrategy.dropNew)
.toMat(Sink.asPublisher(true))(Keep.both).run()
def flowsocket = WebSocket.accept[String, String] {request =>
val aflow:Flow[String, String, _] = {
val sink = Sink.actorRef( actorSystem.actorOf(UserActor.props(outActor)), akka.actor.Status.Success(()) )
val source = Source.fromPublisher(publisher)
Flow.fromSinkAndSource(
sink, source
)
}
aflow
}
}
此后,我修改了我的解决方案以更全面地采用 Actor 模型。我现在有一个“UsersBroadcastActor”,它是一个所有其他“UserActor”连接并可以通过它进行通信的单例演员:
lazy val broadcastActorRef = actorSystem.actorOf(Props[UsersBroadcastActor])
def flowsocket = WebSocket.accept[JsValue, JsValue] { request =>
ActorFlow.actorRef(out => UserActor.props(out, broadcastActorRef))
}
当 UserActor 被实例化时,在它的 preStart() 方法中,它发送一个订阅消息到 broadcastActorRef,它保存对所有“订阅”它的 UserActor 的引用。我可以向broadcastActorRef 发送一条消息,并将其转发给每个UserActor。如果您也想要此解决方案的完整代码示例,请告诉我。