我正在使用线程和队列来获取 url 并存储到数据库。
我只想要一个线程来做存储工作。
所以我编写如下代码:
import threading
import time
import Queue
site_count = 10
fetch_thread_count = 2
site_queue = Queue.Queue()
proxy_array=[]
class FetchThread(threading.Thread):
def __init__(self,site_queue,proxy_array):
threading.Thread.__init__(self)
self.site_queue = site_queue
self.proxy_array = proxy_array
def run(self):
while True:
index = self.site_queue.get()
self.get_proxy_one_website(index)
self.site_queue.task_done()
def get_proxy_one_website(self,index):
print '{0} fetched site :{1}\n'.format(self.name,index)
self.proxy_array.append(index)
def save():
while True:
if site_queue.qsize() > 0:
if len(proxy_array) > 10:
print 'save :{0} to database\n'.format(proxy_array.pop())
else:
time.sleep(1)
elif len(proxy_array) > 0:
print 'save :{0} to database\n'.format(proxy_array.pop())
elif len(proxy_array) == 0:
print 'break'
break
else:
print 'continue'
continue
def start_crawl():
global site_count,fetch_thread_count,site_queue,proxy_array
print 'init'
for i in range(fetch_thread_count):
ft = FetchThread(site_queue,proxy_array)
ft.setDaemon(True)
ft.start()
print 'put site_queue'
for i in range(site_count):
site_queue.put(i)
save()
print 'start site_queue join'
site_queue.join()
print 'finish'
start_crawl()
执行输出:
init
put site_queue
Thread-1 fetched site :0
Thread-2 fetched site :1
Thread-1 fetched site :2
Thread-2 fetched site :3
Thread-1 fetched site :4
Thread-2 fetched site :5
Thread-1 fetched site :6
Thread-2 fetched site :7
Thread-1 fetched site :8
Thread-2 fetched site :9
save :9 to database
save :8 to database
save :7 to database
save :6 to database
save :5 to database
save :4 to database
save :3 to database
save :2 to database
save :1 to database
save :0 to database
break
start site_queue join
finish
[Finished in 1.2s]
为什么save()
函数运行之后site_queue.join()
写在之后save()
。
我也save()
用线程函数替换了,但它也不起作用。
这是否意味着我必须更改proxy_array=[]
为proxy_queue=Queue.Queue()
,然后我可以使用 theading 来存储数据?
我只想要一个thead 做这个,没有其他theads 会从那里获取数据proxy_array
,我为什么要加入它?使用Queue 似乎很奇怪。
有没有更好的解决方案?
更新:
我不想等到所有 FetchThreads 完成他们的工作。我想在 fethcing 时保存数据,它会快得多。我希望结果如下所示(因为我使用了 array.pop(),所以保存 0 可能会在稍后出现,这只是一个易于理解的示例。):
Thread-2 fetched site :1
Thread-1 fetched site :2
save :0 to database
Thread-2 fetched site :3
Thread-1 fetched site :4
save :2 to database
save :3 to database
Thread-2 fetched site :5
Thread-1 fetched site :6
save :4 to database
.......
UPDATE2 对于某人有以下相同的问题:
问题:
正如我在上面所说的那样,没有任何其他线程会从 proxy_array 获取数据。
我只是无法想象为什么它会破坏线程安全?
回答: misha的回答中的
producer-consumer question,仔细阅读后我明白了。
问题:
还有一个问题,如果程序主线程可以作为消费者使用 FetchThreads(换句话说,不需要创建 StoreThread)
这是我无法弄清楚的,我会在找到答案后更新。