0

我正在研究 Akka EventBus 以检查它是否可以解决我的设计问题之一,但我仍然不知道。问题如下。

为了简化,我有:

case class Request(requesterId: String, operation: String, header:  RequestHeader)
case class Response(requesterId: String, operation: String, header: ResponseHeader)

Response我有几个具有不同功能的演员,我希望一些演员根据订阅requesterId,其他一些根据operation. 有没有办法通过 EventBus 和分类器轻松实现这一目标?

谢谢,乔尔

4

2 回答 2

0

我同意 Konrad 的观点,即您应该实施新的LookupClassification总线来解决您的问题。我认为拥有这些总线的两个单独实例是最简单的,一个按 requesterId 分类,另一个按操作分类。这种方法的一些基本设置工作是:

//Singleton to hold the instances of each stream type
object ResponseEventStream{
  val RequestorIdStream = new RequestorIdResponseEventStream
  val OperationStream = new OperationResponseEventStream
}

//Common functionality for the two different types of streams
trait ResponseEventStream extends ActorEventBus with LookupClassification{
  import ResponseEventStream._
  type Event = Response
  type Classifier = String  
  protected def mapSize = 128
  protected def publish(resp:Response, subscriber: ActorRef) = {
    if (subscriber.isTerminated) unsubscribe(subscriber)
    else subscriber ! resp
  }  
}

//Concrete impl that uses requesterId to classify
class RequestorIdResponseEventStream extends ResponseEventStream{
  protected def classify(resp:Response) = resp.requesterId 
}

//Concrete impl that uses operation to classify
class OperationResponseEventStream extends ResponseEventStream{
  protected def classify(resp:Response) = resp.operation 
}

//Trait to mix into classes that need to publish or subscribe to response events
//Has helper methods to simplify interaction with the two distinct streams
trait ResponseEventing{
  import ResponseEventStream._

  def publishResponse(resp:Response){
    RequestorIdStream.publish(resp)
    OperationStream.publish(resp)
  }

  def subscribeByRequestId(requestId:String, ref:ActorRef){
    RequestorIdStream.subscribe(ref, requestId)
  }

  def subscribeByOperartion(op:String, ref:ActorRef){
    OperationStream.subscribe(ref, op)
  }  
}

然后,您只需将该ResponseEventing特征混合到需要发布Response事件或需要订阅事件的演员中。发布的 Actor 将调用publishResponse,而需要订阅的 Actor 将subscribeXXX根据他们感兴趣的分类(requesterId 或 operation)进行调用。

于 2014-11-25T15:35:58.133 回答
0

当然,它被称为LookupEventBus。您可以通过扩展它来实现自己的总线并requesterIdclassify方法中提取 ,如下所示:

class LookupBusImpl extends EventBus with LookupClassification {
  type Event = HasRequesterId // I made up a super type for you here
  type Classifier = String
  type Subscriber = ActorRef

  override def classify(event: HasRequesterId): String = event.requesterId

然后你会订阅一个给定的requesterId,像这样:

  lookupBus.subscribe(actorRef, "requester-100")

然后这个 Actor 将只接收分类为 的消息requester-100

于 2014-11-25T15:17:24.487 回答