这是您想法的另一种实现方式。它使用协作多线程。正如您所建议的,关键是使用多线程并让迭代器__next__方法阻塞,直到所有线程都消耗了当前迭代。
此外,迭代器包含一个(可选的)恒定大小的缓冲区。有了这个缓冲区,我们可以分块读取源代码并避免大量锁定/同步。
我的实现还处理了一些消费者在到达迭代器末尾之前停止迭代的情况。
import threading
class BufferedMultiIter:
def __init__(self, source, n, bufsize = 1):
'''`source` is an iterator or iterable,
`n` is the number of threads that will interact with this iterator,
`bufsize` is the size of the internal buffer. The iterator will read
and buffer elements from `source` in chunks of `bufsize`. The bigger
the buffer is, the better the performance but also the bigger the
(constant) space requirement.
'''
self._source = iter(source)
self._n = n
# Condition variable for synchronization
self._cond = threading.Condition()
# Buffered values
bufsize = max(bufsize, 1)
self._buffer = [None] * bufsize
self._buffered = 0
self._next = threading.local()
# State variables to implement the "wait for buffer to get refilled"
# protocol
self._serial = 0
self._waiting = 0
# True if we reached the end of the source
self._stop = False
# Was the thread killed (for error handling)?
self._killed = False
def _fill_buffer(self):
'''Refill the internal buffer.'''
self._buffered = 0
while self._buffered < len(self._buffer):
try:
self._buffer[self._buffered] = next(self._source)
self._buffered += 1
except StopIteration:
self._stop = True
break
# Explicitly clear the unused part of the buffer to release
# references as early as possible
for i in range(self._buffered, len(self._buffer)):
self._buffer[i] = None
self._waiting = 0
self._serial += 1
def register_thread(self):
'''Register a thread.
Each thread that wants to access this iterator must first register
with the iterator. It is an error to register the same thread more
than once. It is an error to access this iterator with a thread that
was not registered (with the exception of calling `kill`). It is an
error to register more threads than the number that was passed to the
constructor.
'''
self._next.i = 0
def unregister_thread(self):
'''Unregister a thread from this iterator.
This should be called when a thread is done using the iterator.
It catches the case in which a consumer does not consume all the
elements from the iterator but exits early.
'''
assert hasattr(self._next, 'i')
delattr(self._next, 'i')
with self._cond:
assert self._n > 0
self._n -= 1
if self._waiting == self._n:
self._fill_buffer()
self._cond.notify_all()
def kill(self):
'''Forcibly kill this iterator.
This will wake up all threads currently blocked in `__next__` and
will have them raise a `StopIteration`.
This function should be called in case of error to terminate all
threads as fast as possible.
'''
self._cond.acquire()
self._killed = True
self._stop = True
self._cond.notify_all()
self._cond.release()
def __iter__(self): return self
def __next__(self):
if self._next.i == self._buffered:
# We read everything from the buffer.
# Wait until all other threads have also consumed the buffer
# completely and then refill it.
with self._cond:
old = self._serial
self._waiting += 1
if self._waiting == self._n:
self._fill_buffer()
self._cond.notify_all()
else:
# Wait until the serial number changes. A change in
# serial number indicates that another thread has filled
# the buffer
while self._serial == old and not self._killed:
self._cond.wait()
# Start at beginning of newly filled buffer
self._next.i = 0
if self._killed:
raise StopIteration
k = self._next.i
if k == self._buffered and self._stop:
raise StopIteration
value = self._buffer[k]
self._next.i = k + 1
return value
class NotAll:
'''A consumer that does not consume all the elements from the source.'''
def __init__(self, limit):
self._limit = limit
self._consumed = 0
def __call__(self, it):
last = None
for k in it:
last = k
self._consumed += 1
if self._consumed >= self._limit:
break
return last
def multi_iter(iterable, *consumers, **kwargs):
'''Iterate using multiple consumers.
Each value in `iterable` is presented to each of the `consumers`.
The function returns a tuple with the results of all `consumers`.
There is an optional `bufsize` argument. This controls the internal
buffer size. The bigger the buffer, the better the performance, but also
the bigger the (constant) space requirement of the operation.
NOTE: This will spawn a new thread for each consumer! The iteration is
multi-threaded and happens in parallel for each element.
'''
n = len(consumers)
it = BufferedMultiIter(iterable, n, kwargs.get('bufsize', 1))
threads = list() # List with **running** threads
result = [None] * n
def thread_func(i, c):
it.register_thread()
result[i] = c(it)
it.unregister_thread()
try:
for c in consumers:
t = threading.Thread(target = thread_func, args = (len(threads), c))
t.start()
threads.append(t)
except:
# Here we should forcibly kill all the threads but there is not
# t.kill() function or similar. So the best we can do is stop the
# iterator
it.kill()
finally:
while len(threads) > 0:
t = threads.pop(-1)
t.join()
return tuple(result)
from time import time
N = 10 ** 7
notall1 = NotAll(1)
notall1000 = NotAll(1000)
start1 = time()
res1 = (min(range(N)), max(range(N)), sum(range(N)), NotAll(1)(range(N)),
NotAll(1000)(range(N)))
stop1 = time()
print('5 iterators: %s %.2f' % (str(res1), stop1 - start1))
for p in range(5):
start2 = time()
res2 = multi_iter(range(N), min, max, sum, NotAll(1), NotAll(1000),
bufsize = 2**p)
stop2 = time()
print('multi_iter%d: %s %.2f' % (p, str(res2), stop2 - start2))
时间再次很糟糕,但您可以看到使用恒定大小的缓冲区如何显着改善事情:
5 iterators: (0, 9999999, 49999995000000, 0, 999) 0.71
multi_iter0: (0, 9999999, 49999995000000, 0, 999) 342.36
multi_iter1: (0, 9999999, 49999995000000, 0, 999) 264.71
multi_iter2: (0, 9999999, 49999995000000, 0, 999) 151.06
multi_iter3: (0, 9999999, 49999995000000, 0, 999) 95.79
multi_iter4: (0, 9999999, 49999995000000, 0, 999) 72.79
也许这可以作为良好实施的想法来源。