2

我正在使用队列文档页面底部的队列实现,我有几个问题:

  1. 有 50 个线程,它们获取数十万个 JSON 文件。将线程数设置为两倍是个好主意Queue(maxsize=,还是应该或多或少?

  2. 我想将worker功能放在不同的模块中。我该怎么办?当我尝试这样做时,我进入NameError: global name 'q' is not defineditem = q.get()函数行;添加global q到功能没有帮助。

编辑:是否可以将工作函数移动到另一个模块,只需对该代码进行最少的更改?我不太了解对象,我希望代码尽可能简单。

def worker():
    while True:
        item = q.get()
        do_work(item)
        q.task_done()

q = Queue()
for i in range(num_worker_threads):
     t = Thread(target=worker)
     t.daemon = True
     t.start()

for item in source():
    q.put(item)

q.join()      
4

1 回答 1

1

你可以尝试这样的事情

class Worker(threading.Thread):
    def __init__(self,inputQueue,inputLock):
        threading.Thread.__init__(self)
        self.inputQueue=inputQueue
        self.inputLock=inputLock          

    def run(self):       
        while True:
            if not self.inputQueue.empty():
                self.inputLock.acquire()
                item=self.inputQueue.get()
                self.inputLock.release()
        #process item
                self.inputQueue.task_done()
            else:
                time.sleep(1)

class Main():

    def __init__(self):
        self.workersQueue = Queue.Queue()
        self.workersLock = threading.RLock()
        self.workers = list()

        #start threads
        for index in range(10):            
            self.uploaders.append(Worker(self.workersQueue, self.workersLock))
            self.uploaders[index].start()

        self.workersQueue.join()

这里的worker是一个类(一个线程,队列是共享的)。主类用于创建工作线程,从而加载工作线程所需的数据。

希望这会有所帮助

于 2013-01-05T17:43:21.860 回答