3

我正在使用扭曲从连接互联网的传感器获取消息,以便将其存储到数据库中。
我想在不干扰这些过程的情况下检查这些消息,因为我需要将每条消息与 db 中的一些基值进行比较,如果一些匹配,我需要为此触发警报,并且这个想法不是阻止任何进程......

我的想法是创建一个新流程来检查和提醒,但我需要在第一个流程存储消息后,它将消息发送到新流程,以便在需要时进行检查和提醒。

为此我需要IPC,我正在考虑使用ZeroMQ,但也扭曲了一种使用IPC的方法,我想如果我使用ZeroMQ,但也许它会弄巧成拙......

你觉得我的方法怎么样?也许我完全错了?

欢迎任何建议.. 感谢

PD:此过程将在专用服务器上运行,预期负载为 6000 msg/小时,每个 1Kb

4

2 回答 2

3

所有这些方法都是可能的。我只能抽象地说,因为我不知道你的应用程序的精确轮廓。

如果您已经有一个工作应用程序,但它的速度不足以处理您向其发送的消息数量,那么请确定瓶颈。阻塞的两个可能原因是数据库访问或警报触发,因为其中任何一个都可能是同步 IO 操作。

你如何处理这个取决于你的工作量:

  1. 如果您的消息速率很高且恒定,那么您需要确保您的数据库可以处理此速率。如果您的数据库无法处理它,那么再多的非阻塞消息传递将帮助您!按此顺序:
    1. 尝试调整您的数据库。
    2. 尝试将您的数据库放在具有更多内存的更大计算机上。
    3. 尝试在多台机器上分片您的数据库以分配工作负载。一旦您知道您的数据库可以处理消息速率,您就可以使用其他形式的并行处理其他瓶颈。
  2. 如果您的消息速率是突发的,那么您可以使用队列来处理突发。按此顺序:
    1. 将负载平衡器放在消息处理器集群的前面。这个平衡器应该做的就是将传感器消息重新分发到不同的机器以进行检查和警报处理。这种方法的优点是您可能不需要更改现有应用程序,只需在更多机器上运行它即可。如果您的负载均衡器不需要等待响应,只需转发消息,这将最有效。
    2. 如果您的通信需求更复杂或者是双向的,您可以使用消息总线(例如 ZeroMQ)作为消息处理器、警报发送器和数据库检查器之间的通信层。这个想法是通过通过总线进行非阻塞通信并让总线上的每个节点只做一件事来增加并行性。然后,您可以根据每个消息处理阶段花费的时间来更改节点类型的比率。(即使整个消息处理过程中的队列深度相等。)
于 2013-02-08T00:15:00.913 回答
1

当你收到消息时,做两件事:

  • 检查它是否应该触发警报(并在必要时发送警报,大概)
  • 将其插入数据库

您不需要消息队列、多个进程、IPC 或任何这些东西。例如:

def messageReceived(self, message):
    self.checkForAlerts(message).addCallbacks(self.maybeAlert, log.err)
    self.saveMessageToDatabase(message).addErrback(log.err)
于 2013-02-07T22:51:28.587 回答