1

在 Python 中,我有一个函数,它接受一个 URL 列表(url_list)并添加标题。该列表最多可以有 25,000 个 url,所以我想尝试使用多处理。我尝试了以下代码,但我认为由于加入,它并没有真正进行多重处理。我怎样才能使这个真正的多重处理?

def do_it(url_list, headers):
    for i in url_list:
        print "adding header to: \n" + i
        requests.post(i, headers=headers)
        print "done!"



  value = raw_input("Proceed? Enter [Y] for yes: ")
    if value == "Y":
        p = Process(target=do_it, args = (url_list, headers))
        p.start()
        p.join()
    else:
        print "Operation Failed!"
4

1 回答 1

1

您绝对不想创建每个处理 1 个 URL 的 25000 个子进程。但是您也不想要 1 个处理 25000 的进程(这是您构建的)。例如,您可能想要 8 个进程,每个进程处理大约 25000 的 1/8。

您可以通过创建一个Queue完整的 URL、创建 8 个进程来循环通过拉下一个 URL 并完成工作来为该队列提供服务,然后加入所有 8 个进程来做到这一点。

但是你要做的是建立一个进程池。并且已经内置了一个multiprocessing。除了已经构建和调试之外,它还具有您可能不会想到的功能。它还可以以各种不同的方式将结果传回。它可以让你一次分块成批的几个 URL(如果你给每个进程一个完整的列表的 1/8,那么就没有负载平衡;如果你一次给每个进程 1 个,你就是在浪费时间- 可以用于实际工作的进程通信)。等等。

所以,让我们使用它:

def do_it(url, headers):
    print "adding header to: \n" + i
    requests.post(i, headers=headers)
    print "done!"

pool = multiprocessing.Pool(max_workers=8)
results = pool.map(lambda url: do_it(url, headers), urls)
pool.join()

这段代码唯一真正的问题是你在等待它建立一个包含 25000 个结果的列表,这些结果都是None. 还有其他方法可以等待它而不建立返回结果,但实际上,这个列表的成本不值得处理它的额外复杂性。


我在那里使用 a 的原因lambda是你需要一个只接受 each 的函数url,而你只有一个接受 eachurl 加上一个headers参数的函数。lambda您可以通过使用or定义新包装器来创建该函数def,或者通过调用为您喜欢的高阶函数来创建该函数partial。这些基本上是等价的:

results = pool.map(lambda url: do_it(url, headers), urls)

def wrapper(url):
    return do_it(url, headers)
results = pool.map(wrapper, urls)

results = pool.map(partial(do_it, headers=headers), urls)

您可能还需要考虑使用 anExecutor而不是普通池。AnExecutor返回更智能的结果对象,称为Futures,在许多情况下更容易处理。(例如,不必在 中的四个不同map变体之间做出决定Pool,您只需在简单submit方法上创建一个理解,然后在结果s 上调用as_completedor 。)由于这直到 3.2 才添加到 Python 中,对于 2 .x 您必须安装 backport 库才能使用它。无论如何,对于像这样的微不足道的情况,它不会有太大的不同:waitFuturefutures

with futures.ProcessPoolExecutor(max_workers=8) as executor:
    results = executor.map(lambda url: do_it(url, headers), urls)

… 或者:

with futures.ProcessPoolExecutor(max_workers=8) as executor:
    fs = [executor.submit(do_it, url, headers) for url in urls]
    futures.wait(fs)
于 2013-08-27T19:04:46.827 回答