0

我的程序的目的是用线程下载文件。我定义了单位,并使用 len/unit 线程,len 是要下载的文件的长度。

使用我的程序,可以下载文件,但线程并没有停止。我找不到原因。

这是我的代码...

#! /usr/bin/python

import urllib2
import threading
import os
from time import ctime

class MyThread(threading.Thread):
 def __init__(self,func,args,name=''):
  threading.Thread.__init__(self);
  self.func = func;
  self.args = args;
  self.name = name;
 def run(self):
  apply(self.func,self.args);

url = 'http://ubuntuone.com/1SHQeCAQWgIjUP2945hkZF';
request = urllib2.Request(url);
response = urllib2.urlopen(request);
meta = response.info();
response.close();
unit = 1000000;
flen = int(meta.getheaders('Content-Length')[0]);
print flen;
if flen%unit == 0:
 bs = flen/unit;
else :
 bs = flen/unit+1;
blocks = range(bs);
cnt = {};
for i in blocks:
 cnt[i]=i;
def getStr(i):
 try:
  print 'Thread %d start.'%(i,);
  fout = open('a.zip','wb');
  fout.seek(i*unit,0);
  if (i+1)*unit > flen:
   request.add_header('Range','bytes=%d-%d'%(i*unit,flen-1));
  else :
   request.add_header('Range','bytes=%d-%d'%(i*unit,(i+1)*unit-1));
  #opener = urllib2.build_opener();
  #buf = opener.open(request).read();
  resp = urllib2.urlopen(request);
  buf = resp.read();
  fout.write(buf);
 except BaseException:
  print 'Error';
 finally :
  #opener.close();
  fout.flush();
  fout.close();
  del cnt[i];
# filelen = os.path.getsize('a.zip');
 print 'Thread %d ended.'%(i),
 print cnt;
# print 'progress : %4.2f'%(filelen*100.0/flen,),'%';
def main():
 print 'download at:',ctime();
 threads = [];
 for i in blocks:
  t = MyThread(getStr,(blocks[i],),getStr.__name__);
  threads.append(t);
 for i in blocks:
  threads[i].start();
 for i in blocks:
#  print 'this is the %d thread;'%(i,);
  threads[i].join();
 #print 'size:',os.path.getsize('a.zip');
 print 'download done at:',ctime();
if __name__=='__main__':
 main();

有人可以帮我理解为什么线程没有停止。

4

1 回答 1

1

我无法真正解决您的代码示例,因为它非常混乱且难以理解,但是您看到线程未结束的一个潜在原因是请求将停止并且永远不会完成。urllib2允许您指定允许请求执行多长时间的超时。

对于您自己的代码,我建议您将工作分成一个队列,启动固定数量的线程(而不是可变数量),然后让工作线程开始工作,直到完成。使 http 请求超时。如果超时到期,请重试或将工作放回队列中。

下面是一个通用示例,说明如何使用队列、固定数量的工作人员以及它们之间的同步原语:

import threading
import time
from Queue import Queue

def worker(queue, results, lock):
    local_results = []
    while True:
        val = queue.get()
        if val is None:
            break

        # pretend to do work
        time.sleep(.1)
        local_results.append(val)

    with lock:
        results.extend(local_results)
        print threading.current_thread().name, "Done!"


num_workers = 4

threads = []
queue = Queue()
lock = threading.Lock()
results = []

for i in xrange(100):
    queue.put(i)

for _ in xrange(num_workers):

    # Use None as a sentinel to signal the threads to end
    queue.put(None)

    t = threading.Thread(target=worker, args=(queue,results,lock))
    t.start()
    threads.append(t)

for t in threads:
    t.join()

print sorted(results)

print "All done"
于 2012-12-21T03:01:35.963 回答