我有一个由多个产量返回的生成器对象。准备调用这个生成器是相当耗时的操作。这就是为什么我想多次重用生成器。
y = FunctionWithYield()
for x in y: print(x)
#here must be something to reset 'y'
for x in y: print(x)
当然,我正在考虑将内容复制到简单列表中。有没有办法重置我的发电机?
发电机不能重绕。您有以下选择:
再次运行生成器函数,重新开始生成:
y = FunctionWithYield()
for x in y: print(x)
y = FunctionWithYield()
for x in y: print(x)
将生成器结果存储在内存或磁盘上的数据结构中,您可以再次迭代:
y = list(FunctionWithYield())
for x in y: print(x)
# can iterate again:
for x in y: print(x)
选项1的缺点是它再次计算值。如果那是 CPU 密集型的,你最终会计算两次。另一方面,2的缺点是存储。整个值列表将存储在内存中。如果有太多的值,那可能是不切实际的。
所以你有经典的记忆与处理权衡。我无法想象一种在不存储值或再次计算它们的情况下倒带生成器的方法。
另一种选择是使用该itertools.tee()
函数创建生成器的第二个版本:
import itertools
y = FunctionWithYield()
y, y_backup = itertools.tee(y)
for x in y:
print(x)
for x in y_backup:
print(x)
如果原始迭代可能无法处理所有项目,则从内存使用的角度来看,这可能是有益的。
>>> def gen():
... def init():
... return 0
... i = init()
... while True:
... val = (yield i)
... if val=='restart':
... i = init()
... else:
... i += 1
>>> g = gen()
>>> g.next()
0
>>> g.next()
1
>>> g.next()
2
>>> g.next()
3
>>> g.send('restart')
0
>>> g.next()
1
>>> g.next()
2
可能最简单的解决方案是将昂贵的部分包装在一个对象中并将其传递给生成器:
data = ExpensiveSetup()
for x in FunctionWithYield(data): pass
for x in FunctionWithYield(data): pass
这样,您可以缓存昂贵的计算。
如果您可以同时将所有结果保存在 RAM 中,则使用list()
将生成器的结果具体化为一个普通列表并使用它。
我想为老问题提供不同的解决方案
class IterableAdapter:
def __init__(self, iterator_factory):
self.iterator_factory = iterator_factory
def __iter__(self):
return self.iterator_factory()
squares = IterableAdapter(lambda: (x * x for x in range(5)))
for x in squares: print(x)
for x in squares: print(x)
与类似的东西相比,这样做的好处list(iterator)
是,这是O(1)
空间复杂度,并且list(iterator)
是O(n)
. 缺点是,如果你只能访问迭代器,而不能访问生成迭代器的函数,那么你不能使用这种方法。例如,执行以下操作似乎很合理,但它不起作用。
g = (x * x for x in range(5))
squares = IterableAdapter(lambda: g)
for x in squares: print(x)
for x in squares: print(x)
StopIteration
您可以为生成器生成函数编写一个简单的包装器函数,以跟踪生成器何时耗尽。它将使用StopIteration
生成器在迭代结束时抛出的异常来执行此操作。
import types
def generator_wrapper(function=None, **kwargs):
assert function is not None, "Please supply a function"
def inner_func(function=function, **kwargs):
generator = function(**kwargs)
assert isinstance(generator, types.GeneratorType), "Invalid function"
try:
yield next(generator)
except StopIteration:
generator = function(**kwargs)
yield next(generator)
return inner_func
正如您在上面看到的,当我们的包装函数捕获StopIteration
异常时,它只是重新初始化生成器对象(使用函数调用的另一个实例)。
然后,假设你在下面的某个地方定义了你的生成器提供函数,你可以使用 Python 函数装饰器语法来隐式包装它:
@generator_wrapper
def generator_generating_function(**kwargs):
for item in ["a value", "another value"]
yield item
如果 GrzegorzOledzki 的回答还不够,您可能会使用send()
它来实现您的目标。有关增强的生成器和 yield 表达式的更多详细信息,请参阅PEP-0342。
更新:另见itertools.tee()
。它涉及上面提到的一些内存与处理权衡,但它可能会节省一些内存,而不是仅将生成器结果存储在list
; 这取决于您如何使用生成器。
如果您的生成器在某种意义上是纯粹的,它的输出仅取决于传递的参数和步骤号,并且您希望生成的生成器可以重新启动,那么这里有一个可能很方便的排序片段:
import copy
def generator(i):
yield from range(i)
g = generator(10)
print(list(g))
print(list(g))
class GeneratorRestartHandler(object):
def __init__(self, gen_func, argv, kwargv):
self.gen_func = gen_func
self.argv = copy.copy(argv)
self.kwargv = copy.copy(kwargv)
self.local_copy = iter(self)
def __iter__(self):
return self.gen_func(*self.argv, **self.kwargv)
def __next__(self):
return next(self.local_copy)
def restartable(g_func: callable) -> callable:
def tmp(*argv, **kwargv):
return GeneratorRestartHandler(g_func, argv, kwargv)
return tmp
@restartable
def generator2(i):
yield from range(i)
g = generator2(10)
print(next(g))
print(list(g))
print(list(g))
print(next(g))
输出:
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
[]
0
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
1
来自tee 的官方文档:
一般来说,如果一个迭代器在另一个迭代器启动之前使用了大部分或全部数据,那么使用 list() 而不是 tee() 会更快。
所以最好list(iterable)
在你的情况下使用。
您可以定义一个返回生成器的函数
def f():
def FunctionWithYield(generator_args):
code here...
return FunctionWithYield
现在您可以随意执行多次:
for x in f()(generator_args): print(x)
for x in f()(generator_args): print(x)
我不确定你所说的昂贵的准备是什么意思,但我猜你实际上有
data = ... # Expensive computation
y = FunctionWithYield(data)
for x in y: print(x)
#here must be something to reset 'y'
# this is expensive - data = ... # Expensive computation
# y = FunctionWithYield(data)
for x in y: print(x)
如果是这样,为什么不重用data
?
它对我来说是如何工作的。
csv_rows = my_generator()
for _ in range(10):
for row in csv_rows:
print(row)
csv_rows = my_generator()
您现在可以使用more_itertools.seekable
启用重置迭代器的(第三方工具)。
通过安装> pip install more_itertools
import more_itertools as mit
y = mit.seekable(FunctionWithYield())
for x in y:
print(x)
y.seek(0) # reset iterator
for x in y:
print(x)
注意:随着迭代器的推进,内存消耗会增加,所以要警惕大的迭代器。
您可以通过使用itertools.cycle()来做到这一点, 您可以使用此方法创建一个迭代器,然后在迭代器上执行一个 for 循环,该迭代器将循环其值。
例如:
def generator():
for j in cycle([i for i in range(5)]):
yield j
gen = generator()
for i in range(20):
print(next(gen))
将生成 20 个数字,0 到 4 重复。
来自文档的注释:
Note, this member of the toolkit may require significant auxiliary storage (depending on the length of the iterable).
没有重置迭代器的选项。next()
迭代器通常在迭代函数时弹出。唯一的方法是在迭代迭代器对象之前进行备份。检查下面。
使用项目 0 到 9 创建迭代器对象
i=iter(range(10))
遍历将弹出的 next() 函数
print(next(i))
将迭代器对象转换为列表
L=list(i)
print(L)
output: [1, 2, 3, 4, 5, 6, 7, 8, 9]
所以项目0已经弹出。当我们将迭代器转换为列表时,所有项目都会弹出。
next(L)
Traceback (most recent call last):
File "<pyshell#129>", line 1, in <module>
next(L)
StopIteration
因此,您需要在开始迭代之前将迭代器转换为列表以进行备份。列表可以转换为迭代器iter(<list-object>)
好吧,你说你想多次调用一个生成器,但是初始化很昂贵......这样的东西呢?
class InitializedFunctionWithYield(object):
def __init__(self):
# do expensive initialization
self.start = 5
def __call__(self, *args, **kwargs):
# do cheap iteration
for i in xrange(5):
yield self.start + i
y = InitializedFunctionWithYield()
for x in y():
print x
for x in y():
print x
或者,您可以创建自己的遵循迭代器协议并定义某种“重置”功能的类。
class MyIterator(object):
def __init__(self):
self.reset()
def reset(self):
self.i = 5
def __iter__(self):
return self
def next(self):
i = self.i
if i > 0:
self.i -= 1
return i
else:
raise StopIteration()
my_iterator = MyIterator()
for x in my_iterator:
print x
print 'resetting...'
my_iterator.reset()
for x in my_iterator:
print x
https://docs.python.org/2/library/stdtypes.html#iterator-types http://anandology.com/python-practice-book/iterators.html
我的回答解决了稍微不同的问题:如果生成器的初始化成本很高,并且每个生成的对象的生成成本很高。但是我们需要在多个函数中多次使用生成器。为了准确地调用生成器和每个生成的对象,我们可以使用线程并在不同的线程中运行每个使用方法。由于 GIL,我们可能无法实现真正的并行性,但我们会实现我们的目标。
这种方法在以下情况下做得很好:深度学习模型处理了大量图像。结果是图像上的很多对象都有很多蒙版。每个掩码都会消耗内存。我们有大约 10 种方法可以制作不同的统计数据和指标,但它们会同时获取所有图像。所有图像都无法放入内存。这些方法可以很容易地重写以接受迭代器。
class GeneratorSplitter:
'''
Split a generator object into multiple generators which will be sincronised. Each call to each of the sub generators will cause only one call in the input generator. This way multiple methods on threads can iterate the input generator , and the generator will cycled only once.
'''
def __init__(self, gen):
self.gen = gen
self.consumers: List[GeneratorSplitter.InnerGen] = []
self.thread: threading.Thread = None
self.value = None
self.finished = False
self.exception = None
def GetConsumer(self):
# Returns a generator object.
cons = self.InnerGen(self)
self.consumers.append(cons)
return cons
def _Work(self):
try:
for d in self.gen:
for cons in self.consumers:
cons.consumed.wait()
cons.consumed.clear()
self.value = d
for cons in self.consumers:
cons.readyToRead.set()
for cons in self.consumers:
cons.consumed.wait()
self.finished = True
for cons in self.consumers:
cons.readyToRead.set()
except Exception as ex:
self.exception = ex
for cons in self.consumers:
cons.readyToRead.set()
def Start(self):
self.thread = threading.Thread(target=self._Work)
self.thread.start()
class InnerGen:
def __init__(self, parent: "GeneratorSplitter"):
self.parent: "GeneratorSplitter" = parent
self.readyToRead: threading.Event = threading.Event()
self.consumed: threading.Event = threading.Event()
self.consumed.set()
def __iter__(self):
return self
def __next__(self):
self.readyToRead.wait()
self.readyToRead.clear()
if self.parent.finished:
raise StopIteration()
if self.parent.exception:
raise self.parent.exception
val = self.parent.value
self.consumed.set()
return val
用法:
genSplitter = GeneratorSplitter(expensiveGenerator)
metrics={}
executor = ThreadPoolExecutor(max_workers=3)
f1 = executor.submit(mean,genSplitter.GetConsumer())
f2 = executor.submit(max,genSplitter.GetConsumer())
f3 = executor.submit(someFancyMetric,genSplitter.GetConsumer())
genSplitter.Start()
metrics.update(f1.result())
metrics.update(f2.result())
metrics.update(f3.result())
它可以通过代码对象来完成。这是示例。
code_str="y=(a for a in [1,2,3,4])"
code1=compile(code_str,'<string>','single')
exec(code1)
for i in y: print i
1 2 3 4
for i in y: print i
exec(code1)
for i in y: print i
1 2 3 4