我同意 Konrad 的观点,即您应该实施新的LookupClassification
总线来解决您的问题。我认为拥有这些总线的两个单独实例是最简单的,一个按 requesterId 分类,另一个按操作分类。这种方法的一些基本设置工作是:
//Singleton to hold the instances of each stream type
object ResponseEventStream{
val RequestorIdStream = new RequestorIdResponseEventStream
val OperationStream = new OperationResponseEventStream
}
//Common functionality for the two different types of streams
trait ResponseEventStream extends ActorEventBus with LookupClassification{
import ResponseEventStream._
type Event = Response
type Classifier = String
protected def mapSize = 128
protected def publish(resp:Response, subscriber: ActorRef) = {
if (subscriber.isTerminated) unsubscribe(subscriber)
else subscriber ! resp
}
}
//Concrete impl that uses requesterId to classify
class RequestorIdResponseEventStream extends ResponseEventStream{
protected def classify(resp:Response) = resp.requesterId
}
//Concrete impl that uses operation to classify
class OperationResponseEventStream extends ResponseEventStream{
protected def classify(resp:Response) = resp.operation
}
//Trait to mix into classes that need to publish or subscribe to response events
//Has helper methods to simplify interaction with the two distinct streams
trait ResponseEventing{
import ResponseEventStream._
def publishResponse(resp:Response){
RequestorIdStream.publish(resp)
OperationStream.publish(resp)
}
def subscribeByRequestId(requestId:String, ref:ActorRef){
RequestorIdStream.subscribe(ref, requestId)
}
def subscribeByOperartion(op:String, ref:ActorRef){
OperationStream.subscribe(ref, op)
}
}
然后,您只需将该ResponseEventing
特征混合到需要发布Response
事件或需要订阅事件的演员中。发布的 Actor 将调用publishResponse
,而需要订阅的 Actor 将subscribeXXX
根据他们感兴趣的分类(requesterId 或 operation)进行调用。