问题是没有主要平台(截至 2013 年中)允许您创建接近这个数量的线程。您可能会遇到各种不同的限制,并且在不了解您的平台、其配置以及您遇到的确切错误的情况下,不可能知道您遇到了哪一个。但这里有两个例子:
- 在 32 位 Windows 上,默认线程堆栈为 1MB,并且您的所有线程堆栈必须与程序中的其他所有内容一样放入相同的 2GB 虚拟内存空间,因此您将在 60000 之前用完。
- 在 64 位 linux 上,您可能会
ulimit
在几乎耗尽页面空间之前耗尽会话的软值之一。(Linux 有多种不同的限制,超出了 POSIX 要求的限制。)
那么,我如何控制要创建的线程数或任何其他方式使其像超时或其他方式一样工作?
使用尽可能多的线程不太可能是您真正想要做的事情。在 8 核机器上运行 800 个线程意味着您要花费大量时间在线程之间进行上下文切换,并且缓存会在准备好之前不断刷新,等等。
最有可能的是,您真正想要的是以下之一:
- 每个 CPU 一个线程,服务于 60000 个任务池。
- 也许是进程而不是线程(如果主要工作是在 Python 中,或者在没有明确释放 GIL 的 C 代码中)。
- 可能是固定数量的线程(例如,Web 浏览器可能一次执行 12 个并发请求,无论您有 1 个内核还是 64 个内核)。
- 也许是一个池,比如说,600 批每批 100 个任务,而不是 60000 个单个任务。
- 60000 个协同调度的光纤/greenlets/微线程都共享一个真实线程。
- 也许是显式协程而不是调度程序。
- 或通过例如“神奇”合作greenlets
gevent
。
- 每个 CPU 可能有一个线程,每个线程运行 1/N 的纤程。
但这当然是可能的。
一旦你达到了你所达到的任何限制,在线程完成其工作并加入之前,再次尝试很可能会失败,并且很可能在此之后再次尝试会成功。因此,鉴于您显然遇到了异常,您可以像处理 Python 中的其他任何事情一样处理它:使用try
/except
块。例如,像这样:
threads = []
for n in range(0, 60000):
while True:
t = threading.Thread(target=function,args=(x, n))
try:
t.start()
threads.append(t)
except WhateverTheExceptionIs as e:
if threads:
threads[0].join()
del threads[0]
else:
raise
else:
break
for t in threads:
t.join()
当然,这假设启动的第一个任务很可能是第一个完成的任务。如果这不是真的,您将需要某种方式来明确表示完成(条件、信号量、队列等),或者您需要使用一些较低级别(特定于平台)的库来为您提供一种方法等待整个列表,直到至少一个线程完成。
另外,请注意,在某些平台(例如,Windows XP)上,您可能会在接近极限时出现奇怪的行为。
除了变得更好之外,做正确的事情也可能会简单得多。例如,这是一个每个 CPU 的进程池:
with concurrent.futures.ProcessPoolExecutor() as executor:
fs = [executor.submit(function, x, n) for n in range(60000)]
concurrent.futures.wait(fs)
…和一个固定线程数池:
with concurrent.futures.ThreadPoolExecutor(12) as executor:
fs = [executor.submit(function, x, n) for n in range(60000)]
concurrent.futures.wait(fs)
......和一个平衡-CPU-parallelism-with-numpy-vectorization 批处理池:
with concurrent.futures.ThreadPoolExecutor() as executor:
batchsize = 60000 // os.cpu_count()
fs = [executor.submit(np.vector_function, x,
np.arange(n, min(n+batchsize, 60000)))
for n in range(0, 60000, batchsize)]
concurrent.futures.wait(fs)
在上面的示例中,我使用列表推导来提交所有作业并收集它们的未来,因为我们没有在循环内做任何其他事情。但是从您的评论来看,听起来您确实在循环中还有其他想要做的事情。因此,让我们将其转换回显式for
语句:
with concurrent.futures.ProcessPoolExecutor() as executor:
fs = []
for n in range(60000):
fs.append(executor.submit(function, x, n))
concurrent.futures.wait(fs)
现在,无论你想在那个循环中添加什么,你都可以。
但是,我认为您实际上并不想在该循环中添加任何内容。循环只是尽可能快地提交所有作业;它是wait
等待它们全部完成的功能,并且您可能想早点退出。
为此,您可以wait
与FIRST_COMPLETED
标志一起使用,但使用起来要简单得多as_completed
。
另外,我假设error
是由任务设置的某种值。在这种情况下,您需要Lock
在它周围加上一个,就像线程之间共享的任何其他可变值一样。ProcessPoolExecutor
(这是一个地方,a和 a之间的差异略大于一条线ThreadPoolExecutor
——如果你使用进程,你需要multiprocessing.Lock
而不是threading.Lock
.)
所以:
error_lock = threading.Lock
error = []
def function(x, n):
# blah blah
try:
# blah blah
except Exception as e:
with error_lock:
error.append(e)
# blah blah
with concurrent.futures.ProcessPoolExecutor() as executor:
fs = [executor.submit(function, x, n) for n in range(60000)]
for f in concurrent.futures.as_completed(fs):
do_something_with(f.result())
with error_lock:
if len(error) > 1: exit()
但是,您可能需要考虑不同的设计。一般来说,如果你能避免线程之间的共享,你的生活会变得更轻松。期货的设计就是为了让这件事变得简单,让你返回一个值或引发一个异常,就像一个普通的函数调用一样。这f.result()
将为您提供返回值或引发引发的异常。因此,您可以将该代码重写为:
def function(x, n):
# blah blah
# don't bother to catch exceptions here, let them propagate out
with concurrent.futures.ProcessPoolExecutor() as executor:
fs = [executor.submit(function, x, n) for n in range(60000)]
error = []
for f in concurrent.futures.as_completed(fs):
try:
result = f.result()
except Exception as e:
error.append(e)
if len(error) > 1: exit()
else:
do_something_with(result)
请注意这与文档中的ThreadPoolExecutor 示例有多么相似。这个简单的模式足以处理几乎任何没有锁的事情,只要任务不需要相互交互。