我正在使用 python boto 和线程从 S3 快速下载许多文件。我在我的程序中多次使用它,效果很好。但是,有一次它不起作用。在该步骤中,我尝试在 32 核机器(Amazon EC2 cc2.8xlarge)上下载 3,000 个文件。
下面的代码实际上成功地下载了每个文件(除了有时会出现重试无法修复的 httplib.IncompleteRead 错误)。然而,32 个线程中只有 10 个左右实际终止,程序就挂起。不知道为什么会这样。所有文件都已下载,所有线程都应该退出。当我下载较少的文件时,他们会执行其他步骤。我已经被简化为使用单个线程下载所有这些文件(这可以工作但速度非常慢)。任何见解将不胜感激!
from boto.ec2.connection import EC2Connection
from boto.s3.connection import S3Connection
from boto.s3.key import Key
from boto.exception import BotoClientError
from socket import error as socket_error
from httplib import IncompleteRead
import multiprocessing
from time import sleep
import os
import Queue
import threading
def download_to_dir(keys, dir):
"""
Given a list of S3 keys and a local directory filepath,
downloads the files corresponding to the keys to the local directory.
Returns a list of filenames.
"""
filenames = [None for k in keys]
class DownloadThread(threading.Thread):
def __init__(self, queue, dir):
# call to the parent constructor
threading.Thread.__init__(self)
# create a connection to S3
connection = S3Connection(AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)
self.conn = connection
self.dir = dir
self.__queue = queue
def run(self):
while True:
key_dict = self.__queue.get()
print self, key_dict
if key_dict is None:
print "DOWNLOAD THREAD FINISHED"
break
elif key_dict == 'DONE': #last job for last worker
print "DOWNLOADING DONE"
break
else: #still work to do!
index = key_dict.get('idx')
key = key_dict.get('key')
bucket_name = key.bucket.name
bucket = self.conn.get_bucket(bucket_name)
k = Key(bucket) #clone key to use new connection
k.key = key.key
filename = os.path.join(dir, k.key)
#make dirs if don't exist yet
try:
f_dirname = os.path.dirname(filename)
if not os.path.exists(f_dirname):
os.makedirs(f_dirname)
except OSError: #already written to
pass
#inspired by: http://code.google.com/p/s3funnel/source/browse/trunk/scripts/s3funnel?r=10
RETRIES = 5 #attempt at most 5 times
wait = 1
for i in xrange(RETRIES):
try:
k.get_contents_to_filename(filename)
break
except (IncompleteRead, socket_error, BotoClientError), e:
if i == RETRIES-1: #failed final attempt
raise Exception('FAILED TO DOWNLOAD %s, %s' % (k, e))
break
wait *= 2
sleep(wait)
#put filename in right spot!
filenames[index] = filename
num_cores = multiprocessing.cpu_count()
q = Queue.Queue(0)
for i, k in enumerate(keys):
q.put({'idx': i, 'key':k})
for i in range(num_cores-1):
q.put(None) # add end-of-queue markers
q.put('DONE') #to signal absolute end of job
#Spin up all the workers
workers = [DownloadThread(q, dir) for i in range(num_cores)]
for worker in workers:
worker.start()
#Block main thread until completion
for worker in workers:
worker.join()
return filenames