0

以这段代码为例:

def get_hash(path, hash_type='md5'):
    func = getattr(hashlib, hash_type)()
    f = os.open(path, (os.O_RDWR | os.O_BINARY))
    for block in iter(lambda: os.read(f, 1024*func.block_size), b''):
        func.update(block)
    os.close(f)
    return func.hexdigest()

此函数返回任何文件的 md5sum。假设我有一个包含 30 多个文件的目录,并且我想对每个文件运行散列函数:

def hasher(path=some_path):
    for root, dirs, files in os.walk(path, topdown=False):
        for name in files:
            path = os.path.join(root, name)
            yield get_hash(path)
@some_timer_decorator
... some testing function here ...

test1 took 4.684999942779541 seconds.

现在,如您所见,手头的情况让我有机会“利用”该hasher功能并添加多处理:

def hasher_parallel(path=PATH):
    p = multiprocessing.Pool(3)
    for root, dirs, files in os.walk(path, topdown=False):
        for name in files:
            full_name = os.path.join(root, name)
            yield p.apply_async(get_hash, (full_name,)).get()
@some_timer_decorator
... some other testing function here ...

test2 took 4.781000137329102 seconds.

输出是相同的。我期待并行版本更快,因为大多数文件都小于<20MB,并且hasher函数计算这些总和非常快(通常,对于那种大小的文件)。我的实现有问题吗?如果没有问题,是否有更快的方法来解决同样的问题?

#

这是我用来测量执行时间的装饰器函数:

def hasher_time(f):
        def f_timer(*args, **kwargs):
            start = time.time()
            result = f(*args, **kwargs)
            end = time.time()
            print(f.__name__, 'took', end - start, 'seconds')
            return result
        return f_timer
#
4

1 回答 1

3

您正在推出工作,然后等待他们完成

yield p.apply_async(get_hash, (full_name,)).get()

AsyncResult.get()方法会一直 阻塞,直到作业完成,您实际上是按顺序运行作业。

收集工作,对它们进行轮询,AsyncResult.ready()直到它们完成,然后 .get()是结果。

更好的是,通过调用将所有作业推入池中.async_apply(),然后关闭池,调用.join()(阻塞直到所有作业完成),然后使用以下命令检索结果.get()

def hasher_parallel(path=PATH):
    p = multiprocessing.Pool(3)
    jobs = []
    for root, dirs, files in os.walk(path, topdown=False):
        for name in files:
            full_name = os.path.join(root, name)
            jobs.append(p.apply_async(get_hash, (full_name,)))
    p.close()
    p.join()  # wait for jobs to complete
    for job in jobs:
        yield job.get()

您可以使用该Pool.imap()方法稍微简化代码;它会在可用时产生结果:

def hasher_parallel(path=PATH):
    p = multiprocessing.Pool(3)
    filenames = (os.path.join(root, name)
        for root, dirs, files in os.walk(path, topdown=False)
        for name in files)
    for result in p.imap(get_hash, filenames):
        yield result

但也要尝试使用chunksize参数和无序变体

于 2014-03-30T00:53:40.750 回答