6

我正在重新开发一个系统,该系统将通过 http 向众多供应商之一发送消息。原来是 perl 脚本,很可能重新开发也会使用 perl。

在旧系统中,有多个 perl 脚本同时运行,每个供应商五个。当一条消息被放入数据库时​​,随机线程号(1-5)和供应商被选择以确保没有消息被处理两次,同时避免锁定表/行。此外,数据库中有一个“公平队列位置”字段,以确保大消息发送不会延迟发送大消息时发生的小发送。

有时每分钟只有几条消息,但在其他时候,可能会有数十万条消息的转储。在我看来,让所有脚本一直运行并检查消息是一种资源浪费,所以我正在尝试确定是否有更好的方法来做这件事,或者旧的方法是否可以接受。

我现在的想法是让一个脚本根据流量的多少来运行和分叉尽可能多的子进程(达到一个限制),但我不确定如何最好地实现它,以便每个消息只处理一次,同时保持公平排队。

我现在最好的猜测是父脚本更新数据库以指示哪个子进程应该处理它,但是我担心这最终会比原始方法效率低。我几乎没有编写分叉代码的经验(我上次这样做是在大约 15 年前)。

任何关于如何最好地处理消息队列的想法或指南链接表示赞赏!

4

3 回答 3

8

您可以使用 Thread::Queue 或任何其他:Perl 有多处理模块吗?

如果旧系统是以这种方式用 Perl 编写的,那么您可以重用它的大部分内容。

非工作示例:

use strict;
use warnings;

use threads;
use Thread::Queue;

my $q = Thread::Queue->new();    # A new empty queue

# Worker thread
my @thrs = threads->create(sub {
                            while (my $item = $q->dequeue()) {
                                # Do work on $item
                            }
                         })->detach() for 1..10;#for 10 threads
my $dbh = ...
while (1){
  #get items from db
  my @items = get_items_from_db($dbh);
  # Send work to the thread
  $q->enqueue(@items);
  print "Pending items: "$q->pending()."\n";
  sleep 15;#check DB in every 15 secs
}
于 2012-10-31T17:19:07.483 回答
6

我建议使用像 RabbitMQ 这样的消息队列服务器。

一个进程将工作送入队列,您可以让多个工作进程使用队列。

这种方法的优点:

  • 工人在等待工作时阻塞(无忙等待)
  • 如果需要,可以手动启动更多工作进程
  • 工作进程不必是特殊父进程的子进程
  • RabbitMQ 会将工作分配给所有准备好接受工作的工作人员
  • 如果 worker 没有返回 ACK,RabbitMQ 会将工作放回队列
  • 您不必在数据库中分配工作
  • 每个“代理”(工人、生产者等)都是一个独立的进程,这意味着您可以杀死它或重新启动它而不影响其他进程

要动态扩大或缩小工人数量,您可以实现以下内容:

  1. 如果工人在指定的时间内没有工作,他们会自动死亡
  2. 让另一个进程监控队列的长度并在队列变得太大时产生更多的工作人员
于 2012-10-31T16:28:49.243 回答
1

我建议将beanstalkd用于专用作业服务器,并在 perl 脚本中使用Beanstalk::Client将作业添加到队列并删除它们。

与 RabbitMQ 相比,您应该会发现 beanstalkd 更易于安装和设置。它还将负责在可用的工作人员之间分配工作,隐藏任何失败的工作以便稍后重试,安排工作在以后完成,以及更多基本功能。对于您的工人,您不必担心分叉或线程;只需在可用的服务器上启动尽可能多的工作人员即可。

RabbitMQ 或 Beanstalk 都比滚动您自己的 db-backed 解决方案要好。这些项目已经解决了排队所需的许多细节,并实现了您可能尚未意识到您想要的功能。与休眠和从数据库中选择以查看是否有更多工作相比,他们还应该更有效地处理新工作的轮询。

于 2012-10-31T21:16:19.977 回答