1

我正在使用 python boto 和线程从 S3 快速下载许多文件。我在我的程序中多次使用它,效果很好。但是,有一次它不起作用。在该步骤中,我尝试在 32 核机器(Amazon EC2 cc2.8xlarge)上下载 3,000 个文件。

下面的代码实际上成功地下载了每个文件(除了有时会出现重试无法修复的 httplib.IncompleteRead 错误)。然而,32 个线程中只有 10 个左右实际终止,程序就挂起。不知道为什么会这样。所有文件都已下载,所有线程都应该退出。当我下载较少的文件时,他们会执行其他步骤。我已经被简化为使用单个线程下载所有这些文件(这可以工作但速度非常慢)。任何见解将不胜感激!

from boto.ec2.connection import EC2Connection
from boto.s3.connection import S3Connection
from boto.s3.key import Key

from boto.exception import BotoClientError
from socket import error as socket_error
from httplib import IncompleteRead

import multiprocessing
from time import sleep
import os

import Queue
import threading

def download_to_dir(keys, dir):
    """
    Given a list of S3 keys and a local directory filepath,
    downloads the files corresponding to the keys to the local directory.
    Returns a list of filenames.
    """
    filenames = [None for k in keys]

    class DownloadThread(threading.Thread):

        def __init__(self, queue, dir):
            # call to the parent constructor
            threading.Thread.__init__(self)
            # create a connection to S3
            connection = S3Connection(AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)
            self.conn = connection
            self.dir = dir
            self.__queue = queue

        def run(self):
            while True:
                key_dict = self.__queue.get()
                print self, key_dict
                if key_dict is None:
                    print "DOWNLOAD THREAD FINISHED"
                    break
                elif key_dict == 'DONE': #last job for last worker
                    print "DOWNLOADING DONE"
                    break
                else: #still work to do!
                    index = key_dict.get('idx')
                    key = key_dict.get('key')
                    bucket_name = key.bucket.name
                    bucket = self.conn.get_bucket(bucket_name)
                    k = Key(bucket) #clone key to use new connection
                    k.key = key.key

                    filename = os.path.join(dir, k.key)
                    #make dirs if don't exist yet
                    try:
                        f_dirname = os.path.dirname(filename)
                        if not os.path.exists(f_dirname):
                            os.makedirs(f_dirname)
                    except OSError: #already written to
                        pass

                    #inspired by: http://code.google.com/p/s3funnel/source/browse/trunk/scripts/s3funnel?r=10
                    RETRIES = 5 #attempt at most 5 times
                    wait = 1
                    for i in xrange(RETRIES):
                        try:
                            k.get_contents_to_filename(filename)
                            break
                        except (IncompleteRead, socket_error, BotoClientError), e:
                            if i == RETRIES-1: #failed final attempt
                                raise Exception('FAILED TO DOWNLOAD %s, %s' % (k, e))
                                break
                            wait *= 2
                            sleep(wait)

                    #put filename in right spot!
                    filenames[index] = filename

    num_cores = multiprocessing.cpu_count()

    q = Queue.Queue(0)

    for i, k in enumerate(keys):
        q.put({'idx': i, 'key':k})
    for i in range(num_cores-1):
        q.put(None) # add end-of-queue markers
    q.put('DONE') #to signal absolute end of job

    #Spin up all the workers
    workers = [DownloadThread(q, dir) for i in range(num_cores)]
    for worker in workers:
        worker.start()

    #Block main thread until completion
    for worker in workers:
        worker.join() 

    return filenames
4

2 回答 2

4

升级到 AWS 开发工具包版本 1.4.4.0 或更高版本,或仅使用 2 个线程。旧版本的限制为最多 2 个同时连接。这意味着如果您启动 2 个线程,您的代码将运行良好;如果您启动 3 个或更多,您一定会看到不完整的读取和用尽的超时。

您会看到虽然 2 个线程可以大大提高您的吞吐量,但超过 2 个线程并没有太大变化,因为您的网卡一直都很忙。

于 2012-07-11T22:02:32.263 回答
0

S3Connection 使用 httplib.py 并且该库不是线程安全的,因此确保每个线程都有自己的连接至关重要。看起来你正在这样做。

Boto 已经拥有自己的重试机制,但您在其之上添加了一个重试机制来处理某些其他错误。我想知道是否建议在 except 块内创建一个新的 S3Connection 对象。看起来底层的 http 连接在那个时候可能处于不寻常的状态,最好从一个新的连接开始。

只是一个想法。

于 2012-07-11T23:03:36.927 回答