10

我不明白为什么Pipes当有多个发送者和接收者时说不安全。

Queues如果是这种情况,如何将以下代码转换为代码?Queues关闭时不要扔EOFError,所以我的进程不能停止。我是否应该无休止地发送“毒药”消息来告诉他们停止(这样,我确信我的所有进程都至少收到一种毒药)?

我想保持管道p1打开,直到我另有决定(这里是我发送 10 条消息的时间)。


from multiprocessing import Pipe, Process
from random import randint, random
from time import sleep

def job(name, p_in, p_out):
    print(name + ' starting')
    nb_msg = 0
    try:
        while True:
            x = p_in.recv()
            print(name + ' receives ' + x)
            nb_msg = nb_msg + 1
            p_out.send(x)
            sleep(random())
    except EOFError:
        pass
    print(name + ' ending ... ' + str(nb_msg) + ' message(s)')

if __name__ == '__main__':
    p1_in, p1_out = Pipe()
    p2_in, p2_out = Pipe()

    proc = []

    for i in range(3):
        p = Process(target=job, args=(str(i), p1_out, p2_in))
        p.start()
        proc.append(p)

    for x in range(10):
        p1_in.send(chr(97+x))
    p1_in.close()
    for p in proc:
        p.join()
    p1_out.close()
    p2_in.close()

    try:
        while True:
            print(p2_out.recv())
    except EOFError:
        pass

    p2_out.close()
4

2 回答 2

17

本质上,问题在于它Pipe是围绕平台定义的管道对象的薄包装器。recv只需重复接收字节缓冲区,直到获得完整的 Python 对象。如果两个线程或进程recv在同一管道上使用,则读取可能会交错,使每个进程都有半个腌制对象,从而破坏数据。Queue以增加复杂性为代价,在进程之间进行适当的同步。

正如multiprocessing文档所说:

请注意,如果两个进程(或线程)尝试同时读取或写入管道的同一端,则管道中的数据可能会损坏。当然,同时使用管道的不同端的进程没有损坏的风险。

不必无休止地送毒丸;您只需要每个工人一个。每个工人在离开前都拿起一粒毒丸,因此工人不会以某种方式错过消息的危险。

您还应该考虑使用multiprocessing.Pool而不是重新实现“工作进程”模型——Pool有很多方法可以很容易地在多个线程之间分配工作。

于 2012-09-18T20:37:37.297 回答
8

我不明白为什么当有多个发送者和接收者时,管道被称为不安全。

假设您同时将水从源 A 和 B 注入管道。在管道的另一端,你不可能找出哪一部分水来自 A 或 B,对吧?:)

管道在字节级别传输数据流。如果没有通信协议,它不知道消息是什么,因此无法确保消息的完整性。因此,使用具有多个发送者的管道不仅是“不安全的”。这是一个主要的设计缺陷,很可能会导致沟通问题。

但是,队列是在更高级别上实现的。它们是为传递消息(甚至是抽象对象)而设计的。队列用于保持消息/对象自包含。多个来源可以将对象放入队列中,并且多个消费者可以拉出这些对象,同时 100% 确保作为一个单元进入队列的任何内容也作为一个单元从队列中出来。

一段时间后编辑:

我应该在字节流中补充一点,所有字节的检索顺序与发送的顺序相同(保证)。多个发送者的问题是发送顺序(输入顺序)可能已经不清楚或随机,即多个流可能以不可预测的方式混合。

一个通用的队列实现保证单个消息保持完整,即使有多个发送者。消息也按发送顺序检索。然而,对于多个竞争的发送者并且没有进一步的同步机制,同样不能保证输入消息的顺序。

于 2012-09-18T22:13:31.273 回答