2

我正在尝试使用 IPython.parallel 地图。我希望并行化的函数的输入是生成器。由于大小/内存的原因,我无法将生成器转换为列表。请参见下面的代码:

from itertools import product
from IPython.parallel import Client

c = Client()
v = c[:]
c.ids

def stringcount(longstring, substrings):
    scount = [longstring.count(s) for s in substrings]
    return scount

substrings = product('abc', repeat=2)
longstring = product('abc', repeat=3)

# This is what I want to do in parallel
# I should be 'for longs in longstring' I use range() because it can get long.
for num in range(10): 
    longs = longstring.next()
    subs = substrings.next()
    print(subs, longs)
    count = stringcount(longs, subs)
    print(count)

# This does not work, and I understand why.
# I don't know how to fix it while keeping longstring and substrings as
# generators  
v.map(stringcount, longstring, substrings)

for r in v:
    print(r.get())
4

2 回答 2

2

View.map如果不先遍历整个生成器,就不能与生成器一起使用。但是您可以编写自己的自定义函数来从生成器提交批量任务并逐步等待它们。我没有更有趣的例子,但我可以用质数搜索的糟糕实现来说明。

从我们的令牌“数据生成器”开始:

from math import sqrt

def generate_possible_factors(N):
    """generator for iterating through possible factors for N

    yields 2, every odd integer <= sqrt(N)
    """
    if N <= 3:
        return
    yield 2
    f = 3
    last = int(sqrt(N))
    while f <= last:
        yield f
        f += 2

这只是生成一个整数序列,以便在测试一个数字是否为素数时使用。

现在我们将用作任务的琐碎函数IPython.parallel

def is_factor(f, N):
    """is f a factor of N?"""
    return (N % f) == 0

以及使用生成器和我们的因子函数的素数检查的完整实现:

def dumb_prime(N):
    """dumb implementation of is N prime?"""
    for f in generate_possible_factors(N):
        if is_factor(f, N):
            return False
    return True

一次只提交有限数量的任务的并行版本:

def parallel_dumb_prime(N, v, max_outstanding=10, dt=0.1):
    """dumb_prime where each factor is checked remotely

    Up to `max_outstanding` factors will be checked in parallel.

    Submission will halt as soon as we know that N is not prime.
    """
    tasks = set()
    # factors is a generator
    factors = generate_possible_factors(N)
    while True:
        try:
            # submit a batch of tasks, with a maximum of `max_outstanding`
            for i in range(max_outstanding-len(tasks)):
                f = factors.next()
                tasks.add(v.apply_async(is_factor, f, N))
        except StopIteration:
            # no more factors to test, stop submitting
            break
        # get the tasks that are done
        ready = set(task for task in tasks if task.ready())
        while not ready:
            # wait a little bit for some tasks to finish
            v.wait(tasks, timeout=dt)
            ready = set(task for task in tasks if task.ready())

        for t in ready:
            # get the result - if True, N is not prime, we are done
            if t.get():
                return False
        # update tasks to only those that are still pending,
        # and submit the next batch
        tasks.difference_update(ready)
    # check the last few outstanding tasks
    for task in tasks:
        if t.get():
            return False
    # checked all candidates, none are factors, so N is prime
    return True

这一次提交有限数量的任务,一旦我们知道 N 不是素数,我们就停止使用生成器。

要使用此功能:

from IPython import parallel

rc = parallel.Client()
view = rc.load_balanced_view()

for N in range(900,1000):
    if parallel_dumb_prime(N, view, 10):
        print N

笔记本中的更完整的插图。

于 2013-08-11T04:20:04.007 回答
2

我对您的问题采取了稍微不同的方法,这可能对其他人有用。multiprocessing.pool.Pool.imap下面,我尝试通过包装来模仿该方法的行为IPython.parallel.map。这需要我稍微重写你的函数。

import IPython
from itertools import product


def stringcount((longstring, substrings)):
    scount = [longstring.count(s) for s in substrings]
    return (longstring, substrings, scount)

def gen_pairs(long_string, sub_strings):
    for l in long_string:
        s = sub_strings.next()
        yield (l, s)

def imap(function, generator, view, preprocessor=iter, chunksize=256):
    num_cores = len(view.client.ids)
    queue = []
    for i, n in enumerate(preprocessor(generator)):
        queue.append(n)
        if not i % (chunksize * num_cores):
            for result in view.map(function, queue):
                yield result
            queue = []
    for result in view.map(function, queue):
        yield result


client = IPython.parallel.Client()
lbview = client.load_balanced_view()

longstring = product('abc', repeat=3)
substrings = product('abc', repeat=2)

for result in imap(stringcount, gen_pairs(longstring, substrings), lbview):
    print result

我看到的输出在这个笔记本上:http: //nbviewer.ipython.org/gist/driscoll/b8de4bf980de1ad890de

于 2014-11-11T21:42:17.543 回答