3

这是我的代码,它应该做一些与其他问题试图做的非常相似的事情,特别是这个图是相关的:流程图 f1 = 生产,f2 = f3 = 工人,f4 = 消费者。

我还没有尝试解决完美结束一切的问题,这不是这个问题的意义所在。我收到错误“RuntimeError:队列对象只能通过继承在进程之间共享”而且我不确定如何修复它。我只是想将队列传递给像 Go 的频道这样的函数,真的。这是代码。

import multiprocessing


def produce(n, queue):
    for i in xrange(n):
        queue.put(i)

def worker(in_queue, out_queue):
    for i in iter( in_queue.get, None):
        out_queue.put(i*i)

def consumer(queue):
    ans = []
    for i in iter( queue.get, None):
        ans.append(i)
    return ans


def main(n):
    pool = multiprocessing.Pool(4)
    in_queue = multiprocessing.Queue()
    out_queue = multiprocessing.Queue()

    pool.apply_async(produce, (n, in_queue))
    for i in range(2):
        pool.apply_async(worker, (in_queue, out_queue))
    result = consumer(out_queue)
    pool.close()
    pool.join()
    return result

main(200)

我将如何解决它?

有更简单的方法吗?

我试过Pool.map了,但我想让这个工作。

4

2 回答 2

2

Amultiprocessing.Pool已经设置了必要的 IPC 机制,允许您在启动后向其工作人员提交作业,但您不能稍后将 Queue 或类似的参数作为参数传递。这就是你的代码不起作用的原因。在启动子进程时,它必须知道如何与其父进程通信。

所以如果你需要设置自己的Queues,你应该multiprocessing.Process直接使用。此外,您正在编写的是典型的工作人员,它们循环等待新工作并处理它们。在一个工人池上运行这样一个工人不是你想做的事情。

这样你的代码就可以工作了:

import multiprocessing


def produce(n, queue):
    for i in xrange(n):
        queue.put(i)

def worker(in_queue, out_queue):
    for i in iter( in_queue.get, None):
        out_queue.put(i*i)

def consumer(queue):
    ans = []
    for i in iter( queue.get, None):
        print(i)
        ans.append(i)
    return ans


def main(n):
    in_queue = multiprocessing.Queue()
    out_queue = multiprocessing.Queue()
    producer = multiprocessing.Process(target=produce, args=(n, in_queue))
    for i in range(2):
        w = multiprocessing.Process(target=worker, args=(in_queue, out_queue))
        w.start()
    producer.start()
    res = consumer(out_queue)

main(200)

我在您的文件中添加了一条打印声明,consumer以表明发生了一些事情。该consumer函数永远不会终止,因为您从队列中读取的代码等待一个None永远不会出现的终止,因为工人和生产者都没有将一个放入队列......

于 2013-06-25T19:36:47.683 回答
0

使用multiprocessing.Manager它可以跨池进程共享队列。我已经更新了 OP 的代码以使用pool.apply_async方法。为此,我曾经None在流程方法中指示任务完成,并添加代码以等待 main 方法中的异步调用完成。

import multiprocessing

def produce(n, queue):
    for i in xrange(n):
        queue.put(i)
    # None indicates termination / completion
    queue.put(None)

def worker(in_queue, out_queue):
    while True:
        i = in_queue.get()
        if i is None:
            # Put the None back for other workers listening to the queue to see
            in_queue.put(None)
            out_queue.put(None)
            return
        out_queue.put(i*i)

def consumer(queue):
    ans = []
    while True:
        i = queue.get()
        if i is None:
            queue.put(None)
            return ans
        ans.append(i)

def main(n):
    manager = multiprocessing.Manager()
    in_queue = manager.Queue()
    out_queue = manager.Queue()

    pool = multiprocessing.Pool(4)

    async_results = []
    ar = pool.apply_async(produce, (n, in_queue))
    async_results.append(ar)
    for i in range(2):
        ar = pool.apply_async(worker, (in_queue, out_queue))
        async_results.append(ar)
    ar = pool.apply_async(consumer, (out_queue,))
    async_results.append(ar)

    for ar in async_results:
        ar.wait()

    result = async_results[len(async_results)-1].get()
    print(result)

    pool.close()
    pool.join()
    return result

if __name__ == "__main__":
    main(200)
于 2017-07-21T06:58:37.487 回答