2

我正在尝试使用 python 的多处理模块在几台机器上运行分布式任务,我一直在使用这篇博文作为参考。

但是,这篇文章的任务使用了一个作业队列,并将结果放入一个结果队列中,这两个队列都由 JobQueueManager(它是 SyncManager 的子类)管理。这个管理器有一个服务器,它启动并持续运行,直到结果队列被填满,当它调用manager.shutdown().

我的问题是我的任务不需要结果队列,所以我试图弄清楚如何知道何时停止服务器。我可以让服务器持续运行serve_forever,然后手动停止它,或者创建一个虚拟队列,以与示例中相同的方式填充,并在服务器与原始作业数量一样大时停止服务器。

我不想手动停止它,但第二种解决方案似乎相当老套。似乎一种常见的方法(没有服务器)是调用join()每个进程,但我不知道管理器是否有办法找出哪个进程从队列中删除了每个作业。

我的后备计划是虚拟队列方法的一种变体,但是共享计数器变量作为每个进程的最后一步递增,但我想知道是否有任何建议使用多处理库中的方法,或者是否这是不可靠的。

谢谢

编辑:我没有提到我不使用结果队列的原因是我将处理结果存储到 Redis 数据库。

4

2 回答 2

1

在给出的示例中:

outdict = shared_result_q.get()

结果队列用于异步等待结果。这是主要的沟通方式。没有它,您需要另一种信号机制来确认任务结束事件。只需None排队即可。

于 2012-10-20T18:58:01.577 回答
0

正如我的更新所示,我已经使用了一个 redis 数据库来存储我的任务结果,所以我不必担心管理不同机器之间的字典。

我最终采用的解决方案也使用了 Redis 数据库。每当每个进程完成时,我都会将带有进程信息的字符串推送到列表中(r_server.lpush(...)redis-py中)。在服务器端,我没有get对结果队列使用阻塞方法,而是使用 Redis 的阻塞弹出rs.blpop(),它的工作方式相同。

这与这里的博客文章和其他建议几乎相同,以创建一个虚拟队列并使用get(),但只是使用 redis,所以我没有额外方法参数和向管理器注册额外方法的开销。

于 2012-10-24T01:26:29.177 回答