1

我在远程服务器上有一些拆分文件。

我已经尝试一一下载并加入他们。但这需要很多时间。我搜索了一下,发现同时下载可能会加快速度。该脚本在 Python 上。

我的伪是这样的:

url1 = something
url2 = something
url3 = something

data1 = download(url1)
data2 = download(url2)
data3 = download(url3)

wait for all download to complete
join all data and save

谁能指出我可以同时加载所有文件并等待它们完成的方向。

我已经尝试过创建一个类。但是我又不知道如何等到全部完成。

我对线程和队列功能更感兴趣,我可以将它们导入我的平台。

我已经尝试使用 Thread 和 Queue 并在此站点上找到了一个示例。这是代码 pastebin.com/KkiMLTqR 。但它不会等待或永远等待......不确定

4

2 回答 2

5

有两种方法可以同时做事。或者,真的,2-3/4 左右:

  • 多线程
    • 或多个进程,特别是如果“事物”占用大量 CPU 资源
    • 或协程或greenlets,特别是如果有成千上万的“事物”
    • 或以上之一的池
  • 事件循环(手动编码)
    • 或混合 greenlet/事件循环系统,如gevent.

如果您有 1000 个 URL,您可能不想同时执行 1000 个请求。例如,Web 浏览器通常一次只执行 8 个请求。池是一次只做 8 件事的好方法,所以让我们这样做。

而且,由于您一次只做 8 件事情,而且这些事情主要受 I/O 限制,因此线程是完美的。


我会用futures. (如果您使用的是 Python 2.x 或 3.0-3.1,则需要安装 backport,. futures

import concurrent.futures

urls = ['http://example.com/foo', 
        'http://example.com/bar']

with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
    result = b''.join(executor.map(download, urls))

with open('output_file', 'wb') as f:
    f.write(result)

当然,您需要编写download函数,但这与您一次编写这些函数时编写的函数完全相同。

例如,使用urlopen(如果您使用的是 Python 2.x,请使用urllib2代替urllib.request):

def download(url):
    with urllib.request.urlopen(url) as f:
        return f.read()

如果您想学习如何自己构建线程池执行器,源代码实际上非常简单,并且multiprocessing.pool是 stdlib 中的另一个很好的示例。

然而,这两者都有很多多余的代码(处理弱引用以提高内存使用率、干净地关闭、提供不同的等待结果的方式、正确传播异常等)可能会妨碍您。

如果您环顾 PyPI 和 ActiveState,您会发现更简单的设计threadpool,您可能会发现更容易理解。

但这是最简单的可连接线程池:

class ThreadPool(object):
    def __init__(self, max_workers):
        self.queue = queue.Queue()
        self.workers = [threading.Thread(target=self._worker) for _ in range(max_workers)]
    def start(self):
        for worker in self.workers:
            worker.start()
    def stop(self):
        for _ in range(self.workers):
            self.queue.put(None)
        for worker in self.workers:
            worker.join()
    def submit(self, job):
        self.queue.put(job)
    def _worker(self):
        while True:
            job = self.queue.get()
            if job is None:
                break
            job()

当然,一个非常简单的实现的缺点是它的使用不友好concurrent.futures.ThreadPoolExecutor

urls = ['http://example.com/foo', 
        'http://example.com/bar']
results = [list() for _ in urls]
results_lock = threading.Lock()

def download(url, i):
    with urllib.request.urlopen(url) as f:
        result = f.read()
    with results_lock:
        results[i] = url

pool = ThreadPool(max_workers=8)
pool.start()
for i, url in enumerate(urls):
    pool.submit(functools.partial(download, url, i))
pool.stop()

result = b''.join(results)

with open('output_file', 'wb') as f:
    f.write(result)
于 2013-06-20T00:31:16.443 回答
0

您可以使用像twisted这样的异步框架。

或者,这是 Python 的线程可以做的一件事。由于您主要受 IO 限制

于 2013-06-20T00:32:59.117 回答