2

作为参考,这个问题源于通过套接字循环的 Scala 方法性能(收集或 foreach 或其他)?

我在一个actor中存储一个对websocket的引用,然后将该actor订阅到一个Akka EventStream:

val socketActor = system.actorOf(Props(new Actor {
  val socket = WebSocketConnection

  def receive = {
    case d: AppMessage ⇒ socket.send(d)
  }
}))
system.eventStream.subscribe(socketActor, classOf[AppMessage])

让我烦恼的是,我可以用 EventStream 制作的唯一分类器是类类型。因此,如果您想将消息路由到不同的参与者,比如基于 userId,您是否需要创建多个 EventStreams 并手动构建 EventBus 或者这里有什么我遗漏的东西?

如果我能做一些简单的事情就好了:

system.eventStream.subscribe(socketActor, Map("userId" -> userId, "teamId" -> teamId) )

这可能只是一个概念问题,因为我不太确定 EventStream 代表什么。

4

2 回答 2

3

这是我基于ActorEventBus此 Gist 的解决方案:https ://gist.github.com/3757237

我发现这比处理 EventStreams 更易于维护。也许将来需要多个 EventStream,但此时它很容易支持当前的基础架构。

消息总线

首先,MessageBus 根据 PubSub 通道处理到封装在 actor 中的套接字的传出消息:

case class MessageEvent(val channel:String, val message:String)

/**
 * message bus to route messages to their appropriate contexts
 */
class MessageBus extends ActorEventBus with LookupClassification {

    type Event = MessageEvent
  type Classifier = String

  protected def mapSize(): Int = {
    10
  }

  protected def classify(event: Event): Classifier = {
    event.channel
  }

  protected def publish(event: Event, subscriber: Subscriber): Unit = {
    subscriber ! event
  }

}


object MessageBus {

  val actorSystem = ActorSystem("contexts")
  val Bus = new MessageBus

  /**
   * create an actor that stores a browser socket
   */
  def browserSocketContext(s: WebSocketConnection, userId: Long, teamId: Long) = {
    val subscriber = actorSystem.actorOf(Props(new BrowserSocket(s,userId,teamId)))

    Bus.subscribe( subscriber, "/app/socket/%s" format s.toString)
    Bus.subscribe( subscriber, "/app/browser/u/%s" format userId )
    Bus.subscribe( subscriber, "/app/browser/t/%s" format teamId )
    Bus.subscribe( subscriber, "/app/browser" )
  }
}

带有 Actor 的套接字访问

这是实际包含套接字的演员:

/**
 * actor wrapping access for browser socket
 */
class BrowserSocket(
  val s: WebSocketConnection,
  val userId: Long,
  val teamId: Long

) extends Actor {

  def receive = {
    case payload:MessageEvent => 
      s.send(payload.message)

    case ping:MessagePing =>
      s.ping(ping.data)

  }

}
于 2012-09-21T16:54:48.920 回答
1

据我所知,EventStreams 和 Event Bus 用于记录和监控。您通常使用 Actor 构建所需的功能并在它们之间传递消息。

因此,您将其发送AppMessage给自定义路由器参与者,该参与者将整理出要发送到的支持参与者。如果路由器认为合适,也许路由器可以产生支持演员,或者演员可以在路由器上订阅(通过传递适当的消息)。这主要取决于您需要实现的逻辑。

于 2012-09-13T23:00:27.053 回答