16

关于如何在akka中使用事件总线有什么好的教程/解释吗?我已经阅读了 Akka 文档,但我发现很难理解如何使用事件总线

4

2 回答 2

42

不确定是否有任何好的教程,但我可以给你一个可能的用户案例的快速示例,其中使用事件流可能会有所帮助。不过,在较高级别上,事件流是一种很好的机制,可以满足您的应用程序可能具有的发布/订阅类型要求。假设您有一个用例,您可以在系统中更新用户的余额。余额经常被访问,因此您决定缓存它以获得更好的性能。当余额更新时,您还想检查用户的余额是否超过阈值,如果是,请通过电子邮件发送给他们。您不希望将缓存删除或余额阈值检查直接绑定到主余额更新调用中,因为它们可能很重并会减慢用户的响应速度。

//Message and event classes
case class UpdateAccountBalance(userId:Long, amount:Long)
case class BalanceUpdated(userId:Long)

//Actor that performs account updates
class AccountManager extends Actor{
  val dao = new AccountManagerDao

  def receive = {
    case UpdateAccountBalance(userId, amount) =>
      val res = for(result <- dao.updateBalance(userId, amount)) yield{
        context.system.eventStream.publish(BalanceUpdated(userId))
        result                
      }

      sender ! res
  }
}

//Actor that manages a cache of account balance data
class AccountCacher extends Actor{
  val cache = new AccountCache

  override def preStart = {
    context.system.eventStream.subscribe(context.self, classOf[BalanceUpdated])
  }

  def receive = {
    case BalanceUpdated(userId) =>
      cache.remove(userId)
  }
}

//Actor that checks balance after an update to warn of low balance
class LowBalanceChecker extends Actor{
  val dao = new LowBalanceDao

  override def preStart = {
    context.system.eventStream.subscribe(context.self, classOf[BalanceUpdated])
  }

  def receive = {
    case BalanceUpdated(userId) =>
      for{
        balance <- dao.getBalance(userId)
        theshold <- dao.getBalanceThreshold(userId)
        if (balance < threshold)
      }{
        sendBalanceEmail(userId, balance)
      }
  }
}

在此示例中,参与者AccountCacherLowBalanceChecker参与者都订阅eventStream了事件的按类类型BalanceUpdated。如果此事件是发布到流的事件,则这两个参与者实例都将接收到它。然后,在 中AccountManager,当余额更新成功时,它会BalanceUpdated为用户引发一个事件。当这种情况发生时,并行地,该消息被传送到邮箱,AccountCacherLowBalanceChecker导致余额从缓存中删除,并检查帐户阈值,并可能发送一封电子邮件。

现在,您可以直接tell (!)调用AccountManager来与其他两个参与者直接通信,但有人可能会争辩说,这可能过于紧密地耦合了平衡更新的这两个“副作用”,并且这些类型的细节不会必然属于AccountManager。如果您的情况可能会导致一些额外的事情(检查、更新等)需要纯粹作为副作用发生(不是核心业务流程本身的一部分),那么事件流可能是一个好方法解耦正在引发的事件以及谁可能需要对该事件做出反应。

于 2013-04-29T00:31:40.980 回答
11

EventBus每个ActorSystem. _ 这EventBus称为事件流,可以通过调用system.eventStream.

ActorSystem 将事件流用于许多事情,包括记录、发送死信集群事件

您还可以将事件流用于您自己的发布/订阅需求。例如,事件流在测试期间可能很有用。为某些事件(例如日志记录事件)订阅测试工具包到事件流,您就可以。当发生某些事情时您不会向另一个参与者发送消息但您仍然需要在测试中期待该事件时,这可能特别有用。 testActorexpect

请注意,事件流仅在一个ActorSystem. 如果您使用在流上发布的远程处理事件,则默认情况下不要跨到远程系统(尽管您可以自己添加该支持)。

EventBus如果您不想使用事件流,理论上可以创建一个单独的。

正在为 Akka 2.2 编写更好的 Event Bus 文档,因此请在此票证完成后再次查看。

于 2013-04-29T21:36:49.080 回答