View.map
如果不先遍历整个生成器,就不能与生成器一起使用。但是您可以编写自己的自定义函数来从生成器提交批量任务并逐步等待它们。我没有更有趣的例子,但我可以用质数搜索的糟糕实现来说明。
从我们的令牌“数据生成器”开始:
from math import sqrt
def generate_possible_factors(N):
"""generator for iterating through possible factors for N
yields 2, every odd integer <= sqrt(N)
"""
if N <= 3:
return
yield 2
f = 3
last = int(sqrt(N))
while f <= last:
yield f
f += 2
这只是生成一个整数序列,以便在测试一个数字是否为素数时使用。
现在我们将用作任务的琐碎函数IPython.parallel
def is_factor(f, N):
"""is f a factor of N?"""
return (N % f) == 0
以及使用生成器和我们的因子函数的素数检查的完整实现:
def dumb_prime(N):
"""dumb implementation of is N prime?"""
for f in generate_possible_factors(N):
if is_factor(f, N):
return False
return True
一次只提交有限数量的任务的并行版本:
def parallel_dumb_prime(N, v, max_outstanding=10, dt=0.1):
"""dumb_prime where each factor is checked remotely
Up to `max_outstanding` factors will be checked in parallel.
Submission will halt as soon as we know that N is not prime.
"""
tasks = set()
# factors is a generator
factors = generate_possible_factors(N)
while True:
try:
# submit a batch of tasks, with a maximum of `max_outstanding`
for i in range(max_outstanding-len(tasks)):
f = factors.next()
tasks.add(v.apply_async(is_factor, f, N))
except StopIteration:
# no more factors to test, stop submitting
break
# get the tasks that are done
ready = set(task for task in tasks if task.ready())
while not ready:
# wait a little bit for some tasks to finish
v.wait(tasks, timeout=dt)
ready = set(task for task in tasks if task.ready())
for t in ready:
# get the result - if True, N is not prime, we are done
if t.get():
return False
# update tasks to only those that are still pending,
# and submit the next batch
tasks.difference_update(ready)
# check the last few outstanding tasks
for task in tasks:
if t.get():
return False
# checked all candidates, none are factors, so N is prime
return True
这一次提交有限数量的任务,一旦我们知道 N 不是素数,我们就停止使用生成器。
要使用此功能:
from IPython import parallel
rc = parallel.Client()
view = rc.load_balanced_view()
for N in range(900,1000):
if parallel_dumb_prime(N, view, 10):
print N
笔记本中的更完整的插图。