9

是否可以在多个消费者之间“流水线化”使用发电机?

例如,具有这种模式的代码很常见:

def consumer1(iterator):
    for item in iterator:
        foo(item)

def consumer2(iterator):
    for item in iterator:
        bar(item)

myiter = list(big_generator())
v1 = consumer1(myiter)
v2 = consumer2(myiter)

在这种情况下,多个函数完全使用同一个迭代器,因此需要将迭代器缓存在列表中。由于每个消费者都耗尽了迭代器,itertools.tee所以没有用。

我经常看到这样的代码,我总是希望我能让消费者一次消费一个项目,而不是缓存整个迭代器。例如:

  1. consumer1消耗myiter[0]
  2. consumer2消耗myiter[0]
  3. consumer1消耗myiter[1]
  4. consumer2消耗myiter[1]
  5. ETC...

如果我要组成一个语法,它看起来像这样:

c1_retval, c2_retval = iforkjoin(big_generator(), (consumer1, consumer2))

您可以接近线程或多处理和teed 迭代器,但线程消耗的速度不同,这意味着缓存在内部的值 dequetee可能会变得非常大。这里的重点不是利用并行性或加速任务,而是避免缓存迭代器的大部分。

在我看来,如果不修改消费者,这可能是不可能的,因为控制流在消费者手中。但是,当消费者实际使用迭代器时,控制权会传递到迭代器的next()方法中,所以也许可以以某种方式反转控制流,以便迭代器一次阻塞消费者,直到它可以喂饱它们?

如果这是可能的,我还不够聪明,不知道怎么做。有任何想法吗?

4

2 回答 2

1

这不行吗?或者您是否需要整个迭代器,所以每个这样的副本都行不通?如果是这样,那么我认为您要么必须创建一个副本,要么生成两次列表?

for item in big_generator():
    consumer1.handle_item(item)
    consumer2.handle_item(item)
于 2013-03-24T03:57:54.533 回答
1

由于不更改消费者代码的限制(即其中有一个循环),您只剩下两个选择:

  1. 您已经在问题中包含的方法:将生成的项目缓存在内存中,然后对其进行多次迭代。
  2. 在一个线程中运行每个消费者,并实现某种 synchronized- itertools.tee,一个大小为 1 的缓冲区,它阻塞服务项目i+1,直到项目i已提供给所有消费者。

没有其他选择。您无法实现以下所有目标,因为它们相互矛盾:

  1. 有发电机
  2. 有一个循环来消耗所有这些
  3. 然后,(串行)在前一个循环完成后,有另一个循环再次消耗所有它
  4. 仅在使用时将 O(1) 项保存在内存(或磁盘等)中
  5. 不重新生成(即不重新创建生成器)

如果要重复使用生成的项目,则必须将它们存储在某个地方。

如果更改消费者的代码是可以接受的,显然@monkey 的解决方案是最简单和最直接的。

于 2013-03-24T06:39:43.397 回答