5

我有一个包含 500 个输入文件的文件夹(所有文件的总大小约为 500 [MB])。

我想编写一个python执行以下操作的脚本:

(1)将所有输入文件加载到内存

(2)python初始化一个稍后将使用的空列表...参见项目符号(4)

(3)启动 15 个不同的(独立的)进程:每个进程都使用相同的输入数据 [from (1)] -- 但使用不同的算法对其进行处理,从而产生不同的结果

(4)我希望 [来自步骤(3)] 的所有独立进程将它们的输出存储在同一个python列表中 [在步骤 中初始化的相同列表(2)]

一旦所有 15 个进程都完成了它们的运行,我将拥有one python list包括所有 15 个独立进程的结果。

我的问题是,是否可以有效地完成上述工作python?如果是这样,您能否提供一个方案/示例代码来说明如何做到这一点?

注意#1:我将在强大的多核服务器上运行它;所以这里的目标是使用所有处理能力,同时在所有独立进程之间共享一些内存 { input data, }。output list

注意#2:我在一个Linux环境中工作

4

1 回答 1

6

好的,我刚刚使用zeromq将一个订阅者展示给多个发布者。您可能可以对队列做同样的事情,但您需要对它们进行更多管理。zeromq 套接字可以正常工作,这对于 IMO 之类的东西来说非常有用。

"""
demo of multiple processes doing processing and publishing the results
to a common subscriber
"""
from multiprocessing import Process


class Worker(Process):
    def __init__(self, filename, bind):
        self._filename = filename
        self._bind = bind
        super(Worker, self).__init__()

    def run(self):
        import zmq
        import time
        ctx = zmq.Context()
        result_publisher = ctx.socket(zmq.PUB)
        result_publisher.bind(self._bind)
        time.sleep(1)
        with open(self._filename) as my_input:
            for l in my_input.readlines():
                result_publisher.send(l)

if __name__ == '__main__':
    import sys
    import os
    import zmq

    #assume every argument but the first is a file to be processed
    files = sys.argv[1:]

    # create a worker for each file to be processed if it exists pass
    # in a bind argument instructing the socket to communicate via ipc
    workers = [Worker(f, "ipc://%s_%s" % (f, i)) for i, f \
               in enumerate((x for x in files if os.path.exists(x)))]

    # create subscriber socket
    ctx = zmq.Context()

    result_subscriber = ctx.socket(zmq.SUB)
    result_subscriber.setsockopt(zmq.SUBSCRIBE, "")

    # wire up subscriber to whatever the worker is bound to 
    for w in workers:
        print w._bind
        result_subscriber.connect(w._bind)

    # start workers
    for w in workers:
        print "starting workers..."
        w.start()

    result = []

    # read from the subscriber and add it to the result list as long
    # as at least one worker is alive
    while [w for w in workers if w.is_alive()]:
        result.append(result_subscriber.recv())
    else:
        # output the result
        print result

哦,只是为了得到 zmq

$ pip install pyzmq-static
于 2011-07-01T00:33:52.293 回答