0

我有一个文件,其中包含我要处理的 600K+ 行内容。
所以我使用多线程来加速这个过程。
但问题是,例如我使用 50 作为线程数,在处理 50 行之后,脚本什么也不做。它不会终止,也不会显示任何其他内容。

这是我的参考代码:

#!/usr/bin/env python

from __future__ import print_function
import re
import sys
from Queue import *
from threading import Thread, Lock

#struct parameters
if len(sys.argv) != 3:  # the program name and the two arguments
    # stop the program and print an error message
    sys.exit("Usage: python " + sys.argv[0] + " filename maxthreads")

accountlist = sys.argv[1]
maxthreads = int(sys.argv[2])

def dojob(email, password):
    #here is some job to process all my users data
    #end dojob

#this function will process the items in the queue, in serial
def processor():
    if queue.empty() == True:
        print ("the Queue is empty!")
        sys.exit(1)
    try:
        job = queue.get()
        job = job.strip('\r\n')

        newdata = job.split(':')

        email = newdata[0]
        password = newdata[1]

        #pass to dojob and process
        print("Processing:", email)

        dojob(email, password)

        queue.task_done()

    except:
        print ("Failed to operate on job")

#set variables
queue = Queue()
threads = maxthreads

#a list of job items. you would want this to be more advanced,like reading from a file or database
jobs = open(accountlist)

#iterate over jobs and put each into the queue in sequence
for job in jobs:
    print ("inserting job into the queue:", job)
    queue.put(job)

#start some threads, each one will process one job from the queue
for i in range(threads):
    th = Thread(target=processor)
    th.setDaemon(True)
    th.start()

#wait until all jobs are processed before quitting
queue.join() 

任何想法都是为什么它只是停止这个过程。

样本输出:

 #for example thread is 2
 inserting job into queue: user@domain.com
 inserting job into queue: user2@domain.com
 inserting job into queue: another@domain.com
 (...until the end of the file...)
 #once everything was added to the queue, is starts processing.
 processing: user@domain.com
 processing: user2@domain.com
 #then here the problem occurs, it doesnt do anything else.
 #it doesnt continue to the next queued job.
4

1 回答 1

3

听起来你需要一个循环processor()

def processor():
    while not queue.empty():
        try:
            job = queue.get()
            ...

否则,每个线程处理一个作业并停止。

我使用多线程来加快进程。

根据处理的性质,您可能会或可能不会从使用多个线程中获得任何加速。这与全局解释器锁(GIL)有关。如果您发现由于 GIL 而没有获得任何加速,您可能需要考虑使用该multiprocessing模块。

于 2012-12-07T22:28:25.957 回答