0

我试图找出构建我的应用程序的最佳方法,以便我可以以冗余方式从我的播放框架应用程序发送推送通知。

我想实现一个“休息期”,在用户修改数据后 30 秒向移动设备发送推送通知。如果用户在这 30 秒内进行了另一次修改,则需要取消原始通知并替换为应在最近一次修改后 30 秒发送的新通知,依此类推。

问题是我的 API 后端需要相互通信以确保它们不会在 30 秒内发送多个通知,因为它们是负载平衡的。例如:

  1. User1 进行了修改,该修改被发送到 API Server1。在 30 秒内触发通知。
  2. User1 在 5 秒后对同一记录进行第二次修改,最终被路由到 API Server2。另一个通知被触发在 30 秒后发送,因为它不知道 Server1 收到的信息。

这是不正确的行为 - User1 应该只收到一个通知,因为修改发生时数据没有“静止”30 秒。

由于我对 Akka 不是特别熟悉,这似乎是一个很好的学习机会。看起来我可以用 Akka 远程处理来解决这个问题。

这是我能想到的最简单的架构:

  • 在 API 的每个实例中创建一个 akka 系统(“通知”),并使用路由器将消息发送到每个 API 实例,每个 API 实例都有一个 Akka 演员(“通知演员”)

我的 application.conf 看起来像这样:

akka {

  actor {
    provider = "akka.remote.RemoteActorRefProvider"

    deployment {
      /router {
        router = round-robin-group
        routees.paths = [
          "akka.tcp://notifications@server1:2552/user/notificationActor",
          "akka.tcp://notifications@server2:2552/user/notificationActor"]
      }
    }
  }
  remote {
    enabled-transports = ["akka.remote.netty.tcp"]
    netty.tcp {
      hostname = "server1" // set to server1 or server 2 upon deployment
      port = 2552
    }
  }
}

我正在像这样设置系统、演员和路由器:

// the system, instantiated once per server
private val system = ActorSystem("notifications")

// the local actor, instantiated once per server
private val notificationActor = system.actorOf(Props[NotificationActor], name = "notificationActor")

// the router, instantiated once per server
private val router = system.actorOf(FromConfig.props(Props[NotificationActor]), name = "router")

当我需要发送通知时,我会告诉我的演员安排它。这样每个系统都可以保留键/值对中的 Cancelable 实例,并在数据在不同服务器上更新时取消通知:

Client.scala(近似值,可能有错别字)

def updateData(data: DbData) = {
   // update data in db
   NotificationController.sendMessage(Notification(data.key))
}

NotificationController.scala(近似值,可能有错别字)

def sendMessage(pushNotification: Notification) = {

   // cancel all notifications related to this data on all servers
   router ! Broadcast(CancelNotification(pushNotification))

   // schedule the new notification on the next available actor
   router ! ScheduleNotification(pushNotification)
}

CancelNotification.scala(近似值,可能有错别字)

case class CancelNotification(pushNotification: Notification)

ScheduleNotification.scala(近似值,可能有错别字)

case class ScheduleNotification(pushNotification: Notification)

NotificationActor.scala(近似值,可能有错别字)

val cancellableMap: Map[String, Cancellable] = // (new concurrent hash map)
def receive: Receive = {
  case ScheduleNotification(pushNotification) => //uses - this.context.system.scheduler.scheduleOnce and stores the key/Cancellable pair in cancellableMap
  case CancelNotification(pushNotification) => //use stored Cancellable to cancel notification, if present
  case Notification(key) => //actually send the push notification
}

这在本地工作得很好,但是一旦我将它部署到我的测试环境(使用多台机器),其他所有消息似乎都丢失了。我认为这是因为它试图将这些消息发送到 Server2,但我在任何一个应用程序的日志文件中都没有看到任何错误。我尝试在我的 akka 配置中添加更多日志记录,但在 logs/application.log (默认播放框架日志)中没有看到任何额外的输出:

akka {

  loglevel = "DEBUG"
  log-config-on-start = on

  actor {
    provider = "akka.remote.RemoteActorRefProvider"

    deployment {
      /router {
        router = round-robin-group
        routees.paths = [
          "akka.tcp://notifications@server1:2552/user/notificationActor",
          "akka.tcp://notifications@server2:2552/user/notificationActor"]
      }
    }

    debug {
      unhandled = on
    }
  }
  remote {
    log-sent-messages = on
    log-received-messages = on
    enabled-transports = ["akka.remote.netty.tcp"]
    netty.tcp {
      hostname = "server1" // set to server1 or server 2 upon deployment
      port = 2552
    }
  }
}

为什么 Server2 收不到消息?我可以在每个实例上使用来自所有服务器的演员来实例化一个演员系统吗?他们应该能够交叉通信吗?

此外,如果我对此过于复杂,我愿意接受其他解决方案。如果我能让它工作,这似乎是最简单的方法。

4

1 回答 1

0

我想我想通了。这种架构似乎可以工作,但我有两个问题,我通过从本地计算机运行它并将 server1 和 server2 添加到我的配置来解决。

  1. 当应用程序启动时,我没有实例化我的演员系统——我使用的是惰性 val,这意味着演员还没有准备好,因为它没有收到会导致它被创建的请求。换句话说,Server2 上的 notificationActor 实例没有在监听,因为它还没有初始化。

    为了解决这个问题,我将所有与 akka 相关的组件(系统、actor 和路由器)的初始化移到了我的 GlobalSettings 类的 onStart 方法中,这样它就可以在播放应用程序启动时立即运行。这是首要问题。

  2. 我的消息不可序列化。

于 2015-12-16T08:18:27.463 回答