43

最近,我一直在尝试围绕 Akka 和基于 actor 的系统的概念展开思考。虽然我现在对 Akka 基础知识有了很好的理解,但在集群和远程 Actor 方面,我仍然在一些事情上苦苦挣扎。

我将尝试使用Play Framework 2.0 附带的 WebSocket 聊天示例来说明这个问题:有一个角色持有 WebSocket,并保留了当前连接用户的列表。演员基本上在技术上和逻辑上都代表了聊天室。只要在单个服务器上运行一个聊天室,它就可以很好地工作。

现在我试图理解当我们谈论在服务器集群上运行的许多动态聊天室(可以随时打开/关闭新房间)时,如何扩展这个示例(添加或删除单个节点)根据当前需求)。在这种情况下,用户 A 可以连接到服务器 1,而用户 B 连接到服务器 2。两者可能在同一个聊天室中交谈。在每台服务器上,仍然会有一个参与者(对于每个聊天室?),它持有 WebSocket 实例以接收和发布事件(消息)给正确的用户。但从逻辑上讲,服务器 1 或服务器 2 上应该只有一个聊天室参与者保存当前连接的用户(或类似任务)的列表。

您将如何实现这一点,最好是在“纯 akka”中并且不添加像 ZeroMQ 或 RabbitMQ 这样的额外消息传递系统?

到目前为止,这是我想出的,请让我知道这是否有意义:

  1. 用户 A 连接到服务器 1,并且分配了一个持有他的 WebSocket 的演员。
  2. 参与者检查(使用路由器?EventBus?其他东西?)活动聊天室的“聊天室参与者”是否存在于任何连接的集群节点上。因为它不是,它会以某种方式请求创建一个新的聊天室参与者,并将向/从这个参与者发送和接收未来的聊天消息。
  3. 用户 B 在服务器 2 上连接,并且也为他的 WebSocket 分配了一个演员。
  4. 它还检查请求的聊天室的参与者是否存在于某处,并在服务器 1 上找到它。
  5. 服务器 1 上的聊天室参与者现在充当给定聊天室的中心,向所有“连接”的聊天成员参与者发送消息并分发传入的参与者。

如果服务器 2 出现故障,则必须以某种方式在服务器 2 上重新创建/移动聊天室参与者,尽管这不是我现在主要关心的问题。我最想知道的是,演员的这种动态发现如何传播到各种基本独立的机器上,可以使用 Akka 的工具集来完成。

我已经查看 Akka 的文档很长一段时间了,所以也许我在这里遗漏了明显的东西。如果是这样,请赐教:-)

4

3 回答 3

13

我正在从事一个私人项目,该项目基本上是聊天室示例的一个非常扩展的版本,而且我也遇到了 akka 的启动问题和整个“去中心化”思维。所以我可以告诉你我是如何“解决”我的扩展聊天室的:

我想要一个无需太多额外配置即可轻松部署多次的服务器。我使用 redis 作为所有打开的用户会话(其 ActorRef 的简单序列化)和所有聊天室的存储。

服务器有以下参与者:

  • WebsocketSession:它保持与一个用户的连接并处理来自用户的请求并转发来自系统的消息。
  • ChatroomManager:这是中央广播器,部署在服务器的每个实例上。如果用户想向聊天室发送消息,WebSocketSession-Actor 会将所有信息发送到 ChatroomManager-Actor,然后由 ChatroomManager-Actor 将消息广播给聊天室的所有成员。

所以这是我的程序:

  1. 用户 A 连接到分配新 WebsocketSession 的服务器 1。这个actor将这个actor的绝对路径插入到redis中。
  2. 用户 A 加入聊天室 X,该聊天室 X 还将他的绝对路径(我将其用作用户会话的唯一 ID)插入 redis(每个聊天室都有一个“连接”集)
  3. 用户 B 连接到服务器 2 -> redis
  4. 用户 B 加入聊天室 X -> redis
  5. 用户 B 向聊天室 X 发送消息如下:用户 B 通过 Websocket 将他的消息发送给他的会话参与者,会话参与者(经过一些检查)向 ChatroomManager 发送参与者消息。这个actor实际上从redis(与akka的actorFor-method一起使用的绝对路径)中检索聊天室的用户列表,然后将消息发送给每个会话actor。然后这些会话参与者写入他们的 websocket。

在每个 ChatroomManager-actor 中,我都会进行一些ActorRef缓存,从而提高速度。我认为这与您的方法不同,尤其是这些 ChatroomManager 处理所有聊天室的请求。但是一个聊天室只有一个演员是我想避免的单点故障。此外,这会导致更多消息,例如:

  • 用户 A 和用户 B 在服务器 1 上。
  • 聊天室 X 在服务器 2 上。

如果用户 A 想与用户 B 交谈,他们都必须通过服务器 1 上的聊天室参与者进行通信。

此外,我使用 akka 的功能(如(循环)路由器)在每个系统上创建 ChatroomManager-actor 的多个实例来处理许多请求。

我花了几天时间结合序列化和 redis 设置整个 akka 远程基础设施。但是现在我可以创建任意数量的服务器应用程序实例,这些实例使用 redis 来共享ActorRef(序列化为带有 ip+port 的绝对路径)。

这可能会进一步帮助您,并且我愿意接受新问题(请不要谈论我的英语;)。

于 2012-06-02T14:37:22.783 回答
10

跨多台机器横向扩展的关键是尽可能隔离可变状态。尽管您可以使用分布式缓存来协调所有节点的状态,但这会给您带来同步以及在扩展到大量节点时遇到的瓶颈问题。理想情况下,应该只有一个参与者知道聊天室中的消息和参与者。

您的问题的核心是,如果聊天室由在单台机器上运行的单个演员表示 - 或者确实存在这样的房间。诀窍是使用标识符(例如聊天室的名称)路由与给定聊天室相关的请求。计算名称的哈希值,并根据数字从 n 个框中选择一个。该节点将了解其当前的聊天室,并可以安全地为您找到或创建正确的聊天室参与者。

您可以查看以下讨论 Akka 中的集群和横向扩展的博客文章:

http://blog.softmemes.com/2012/06/16/clustered-akka-building-akka-2-2-today-part-1/

http://blog.softmemes.com/2012/06/16/clustered-akka-building-akka-2-2-today-part-2/

于 2012-06-20T08:44:16.657 回答
7

我会使用 Zookeeper+Norbert 来了解哪些主机正在上升和下降:

http://www.ibm.com/developerworks/library/j-zookeeper/

现在我的聊天室服务器场中的每个节点都可以知道逻辑集群中的所有主机。当节点离线(或上线)时,他们会收到回调。任何节点现在都可以保留当前集群成员的排序列表,对聊天室 ID 进行哈希处理,并按列表大小进行 mod 以获得列表中的索引,该索引应该是托管任何给定聊天室的节点。我们可以加 1 并 rehash 来选择第二个索引(需要一个循环,直到你得到一个新的索引)来计算第二个主机来保存聊天室的第二个副本以实现冗余。在两个聊天室主机中的每一个上都有一个聊天室参与者,它只是将所有聊天消息转发给每个作为聊天室成员的 Websocket 参与者。

现在我们可以使用自定义 Akka 路由器通过两个活跃的聊天室参与者发送聊天消息。客户端只发送一次消息,路由器将执行哈希模块并发送给两个远程聊天室参与者。我会使用 twitter 雪花算法为正在发送的消息生成唯一的 64 位 ID。请参阅以下链接中代码的 nextId() 方法中的算法。可以使用 norbert 属性设置 datacenterId 和 workerId 以确保在不同的服务器上不会生成冲突 ID:

https://github.com/twitter/snowflake/blob/master/src/main/scala/com/twitter/service/snowflake/IdWorker.scala

现在,每条消息的两个副本将通过两个活跃的聊天室参与者中的每一个发送到每个客户端端点。在每个 Websocket 客户端参与者中,我将取消对雪花 ID 的位掩码,以了解发送消息的 datacenterId+workerId 编号,并跟踪从集群中每个主机看到的最高聊天消息编号。然后我会忽略任何不高于给定发送方主机的给定客户端已经看到的消息。这将对通过两个活动聊天室参与者传入的消息对进行重复数据删除。

到目前为止,一切都很好; 我们将拥有弹性消息传递,因为如果任何节点死亡,我们不会丢失聊天室的一个幸存副本。消息将自动通过第二个聊天室不间断地流动。

接下来,我们需要处理从集群中退出或重新添加到集群中的节点。我们将在每个节点内收到一个 norbert 回调,以通知我们有关集群成员资格的更改。在这个回调中,我们可以通过自定义路由器发送一条 akka 消息,说明新的成员列表和当前主机名。当前主机上的自定义路由器将看到该消息并更新其状态以了解新的集群成员,以计算新的节点对以发送任何给定的聊天室流量。新集群成员身份的确认将由路由器发送到所有节点,以便每个服务器都可以跟踪所有服务器何时赶上成员身份更改并现在正确发送消息。

会员变更后,幸存的聊天室可能仍处于活动状态。在这种情况下,所有节点上的所有路由器将继续正常向其发送消息,但也会推测性地向新的第二个聊天室主机发送消息。第二个聊天室可能尚未启动,但这不是问题,因为消息将通过幸存者流动。如果在成员资格更改后幸存的聊天室不再处于活动状态,则所有主机上的所有路由器将首先发送到三个主机;幸存者和两个新节点。可以使用 akka 死亡监视机制,以便所有节点最终都可以看到幸存的聊天室关闭,从而通过两个主机重新路由聊天流量。

接下来,我们需要根据情况将聊天室从幸存的服务器迁移到一两个新主机上。幸存的聊天室参与者将在某个时候收到一条消息,告诉它新的集群成员身份。它将首先向新节点发送聊天室成员的副本。此消息将在新节点上创建具有正确成员资格的聊天室参与者的新副本。如果幸存者不再是应该拥有聊天室的两个节点之一,它将进入退役模式。在退役模式下,它只会将任何消息转发到新的主节点和辅助节点,而不是任何聊天室成员。Akka 消息转发非常适合这一点。

退役聊天室将监听来自每个节点的 norbert 集群成员身份确认消息。最终它将看到集群中的所有节点都已经确认了新的集群成员。然后它知道它将不再接收任何要转发的消息。然后它可以杀死自己。Akka 热交换非常适合实现退役行为。

到目前为止,一切都很好; 我们有一个弹性消息设置,不会因为节点崩溃而丢失消息。在集群成员发生变化时,我们将获得节点内流量的激增,以将聊天室复制到新节点。我们还有一连串的节点内消息转发到节点,直到所有服务器都赶上哪些聊天室已经移动了两个哪些服务器。如果我们想扩展系统,我们可以等到用户流量的低点,然后打开一个新节点。聊天室将自动在新节点之间重新分配。

以上描述基于阅读以下论文并将其翻译成akka概念:

https://www.dropbox.com/s/iihpq9bjcfver07/VLDB-Paper.pdf

于 2013-07-02T21:47:26.273 回答