这是我基于ActorEventBus
此 Gist 的解决方案:https ://gist.github.com/3757237
我发现这比处理 EventStreams 更易于维护。也许将来需要多个 EventStream,但此时它很容易支持当前的基础架构。
消息总线
首先,MessageBus 根据 PubSub 通道处理到封装在 actor 中的套接字的传出消息:
case class MessageEvent(val channel:String, val message:String)
/**
* message bus to route messages to their appropriate contexts
*/
class MessageBus extends ActorEventBus with LookupClassification {
type Event = MessageEvent
type Classifier = String
protected def mapSize(): Int = {
10
}
protected def classify(event: Event): Classifier = {
event.channel
}
protected def publish(event: Event, subscriber: Subscriber): Unit = {
subscriber ! event
}
}
object MessageBus {
val actorSystem = ActorSystem("contexts")
val Bus = new MessageBus
/**
* create an actor that stores a browser socket
*/
def browserSocketContext(s: WebSocketConnection, userId: Long, teamId: Long) = {
val subscriber = actorSystem.actorOf(Props(new BrowserSocket(s,userId,teamId)))
Bus.subscribe( subscriber, "/app/socket/%s" format s.toString)
Bus.subscribe( subscriber, "/app/browser/u/%s" format userId )
Bus.subscribe( subscriber, "/app/browser/t/%s" format teamId )
Bus.subscribe( subscriber, "/app/browser" )
}
}
带有 Actor 的套接字访问
这是实际包含套接字的演员:
/**
* actor wrapping access for browser socket
*/
class BrowserSocket(
val s: WebSocketConnection,
val userId: Long,
val teamId: Long
) extends Actor {
def receive = {
case payload:MessageEvent =>
s.send(payload.message)
case ping:MessagePing =>
s.ping(ping.data)
}
}