一个古老的问题,以及变体self._sentinel = Object()
将起作用。在 2021 年重新审视这一点,我建议将concurrent.futures与 usingNone
作为您的哨兵结合使用:
# Note: this is Python 3.8+ code
import queue
import time
import functools
import random
from concurrent.futures import ThreadPoolExecutor
def worker(tup):
(q,i) = tup
print(f"Starting thread {i}")
partial_sum = 0
numbers_added = 0
while True:
try:
item = q.get()
if item is None:
# 'propagate' this 'sentinel' to anybody else
q.put(None)
break
numbers_added += 1
partial_sum += item
# need to pretend that we're doing something asynchronous
time.sleep(random.random()/100)
except Exception as e:
print(f"(warning) Thread {i} got an exception {e}, that shouldn't happen.")
break
print(f"Thread {i} is done, saw a total of {numbers_added} numbers to add up")
return partial_sum
MAX_RANGE = 1024
MAX_THREADS = 12
with ThreadPoolExecutor() as executor:
# create a queue with numbers to add up
(q := queue.Queue()).queue = queue.deque(range(MAX_RANGE))
# kick off the threads
future_partials = executor.map(worker, [(q,i) for i in range(MAX_THREADS)])
# they'll be done more or less instantly, but we'll make them wait
print("Threads launched with first batch ... sleeping 2 seconds")
time.sleep(2)
# threads are still available for more work!
for i in range(MAX_RANGE):
q.put(i)
print("Finished giving them another batch, this time we're not sleeping")
# now we tell them all to wrap it up
q.put(None)
# this will nicely catch the outputs
sum = functools.reduce(lambda x, y: x+y, future_partials)
print(f"Got total sum {sum} (correct answer is {(MAX_RANGE-1)*MAX_RANGE}")
# Starting thread 0
# Starting thread 1
# Starting thread 2
# Starting thread 3
# Starting thread 4
# Starting thread 5
# Starting thread 6
# Starting thread 7
# Starting thread 8
# Starting thread 9
# Starting thread 10
# Starting thread 11
# Threads launched with first batch ... sleeping 2 seconds
# Finished giving them another batch, this time we're not sleeping
# Thread 0 is done, saw a total of 175 numbers to add up
# Thread 3 is done, saw a total of 178 numbers to add up
# Thread 11 is done, saw a total of 173 numbers to add up
# Thread 4 is done, saw a total of 177 numbers to add up
# Thread 9 is done, saw a total of 169 numbers to add up
# Thread 1 is done, saw a total of 172 numbers to add up
# Thread 7 is done, saw a total of 162 numbers to add up
# Thread 10 is done, saw a total of 161 numbers to add up
# Thread 5 is done, saw a total of 169 numbers to add up
# Thread 2 is done, saw a total of 157 numbers to add up
# Thread 6 is done, saw a total of 169 numbers to add up
# Thread 8 is done, saw a total of 186 numbers to add up
# Got total sum 1047552 (correct answer is 1047552
请注意事实上的“主线程”只需要推None
送到队列中,类似于条件变量“信号”,线程都拾取(并传播)。
此外,这不使用Queue
比标准(线程安全)队列更重的多处理器。上面的代码还具有易于修改为使用ProcessPoolExecutor
或两者混合的好处(在任何一种情况下,您都需要使用multiprocessing.Queue
)。
(旁注:一般来说,如果需要类来解决任何给定一代 Python 中的“基本”问题,在更现代的版本中通常会有新的选项。)
(旁注:代码是 Python 3.8+ 的唯一原因是因为我是赋值表达式的粉丝,根据上面的旁注,它解决了如何从列表中初始化队列的历史问题,而无需诉诸非功能性解决方案。)