3

这是来自 Python 文档的示例代码:

def worker():
    while True:
        item = q.get()
        do_work(item)
        q.task_done()

q = Queue()
for i in range(num_worker_threads):
     t = Thread(target=worker)
     t.daemon = True
     t.start()

for item in source():
    q.put(item)

q.join()       # block until all tasks are done

我对其进行了修改以适合我的用例,如下所示:

import threading
from Queue import Queue

max_threads = 10

q = Queue(maxsize=max_threads + 2)

def worker():
  while True:
    task = q.get(1)
    # do something with the task
    q.task_done()

for i in range(max_threads):
  t = threading.Thread(target=worker)
  t.start()

for task in ['a', 'b', 'c']:
  q.put(task)

q.join()

当我执行它时,调试器说所有作业都已执行,但 q.join() 似乎永远等待。如何向已发送所有任务的工作线程发送信号?

4

3 回答 3

2

此过程未完成,.join()因为工作线程继续等待新的队列数据(阻塞.get()

这是一个使用简单标志finishUp告诉工人退出的方法,我们在.join()完成后设置它 - 意味着所有任务都已处理。我在调用中添加了超时q.get()以允许它检查finishUp标志

import threading
import queue

max_threads = 5
q = queue.Queue(maxsize=max_threads + 2)
finishUp = False

def worker():
    while True:
        try:
            task = q.get(block=True, timeout=1)
            # do something with the task
            print ("processing task for:"+str(task))
            q.task_done()
        except Exception as ex: # we get this exception when queue is empty
            if finishUp:
                print ("thread finishing because processing is done")
                return

for i in range(max_threads):
  t = threading.Thread(target=worker)
  t.start()

for task in ['a', 'b', 'c']:
  q.put(task)

print ("waiting on join")
q.join()
finishUp = True  # let the workers know that they can exit
print ("finished")

这会产生以下输出:

waiting on join
processing task for:a
processing task for:b
processing task for:c
finished
thread finishing because processing is done
thread finishing because processing is done
thread finishing because processing is done
thread finishing because processing is done
thread finishing because processing is done

Process finished with exit code 0
于 2019-01-05T19:59:56.330 回答
1

q.join()实际上返回。您可以通过放在行print("done")后进行测试q.join()

....
q.join()
print('done')

那么,为什么程序不结束呢?因为,默认情况下,线程是非守护线程。

您可以使用将线程设置为守护线程<thread_object>.daemon = True

for i in range(max_threads):
    t = threading.Thread(target=worker)
    t.daemon = True # <---
    t.start()

根据threading模块文档

守护进程

一个布尔值,指示此线程是否为守护线程 (True) 或不是 (False)。这必须在调用 start() 之前设置,否则会引发 RuntimeError。它的初始值继承自创建线程;主线程不是守护线程,因此在主线程中创建的所有线程默认为 daemon = False。

当没有活着的非守护线程时,整个 Python 程序退出。

2.6 版中的新功能。

于 2013-10-15T18:06:48.343 回答
1

我定义了一个DONE对象来表示工作结束:

DONE = object()

当上层知道不再有数据到来时,将其放入队列中:

q.put_nowait(DONE)

在工作线程中,一旦收到对象,线程就会退出。但是如果有其他线程在同一个队列上监听,我们必须把对象放回队列中:

item = q.get()
if item is DONE:
    q.put_nowait(DONE)
    return

干杯:)

于 2020-08-06T01:25:37.547 回答