对于数据提取作业,我使用 ProcessPoolExecutor 从存储中提取数据,一旦提取达到内存中记录的阈值数量,提取的数据集通过 ProcessPoolExecutor 移交给另一个进程,然后主进程/迭代器继续获取记录。通过 ProcessPoolExecutor 生成的进程将转换、过滤接收到的数据集中的记录。
在处理较大的数据集时,我经常会经常收到以下 BrokenProcessPool。以此为提示(更大的数据集),调整相关参数,这个错误会延迟一点,但最终会抛出错误。由于缺乏信息,它仍然是一个不确定的谜题,无法确定它何时/哪个点会再次发生。
我确实查看了其他问题,但无法将其用作信息。
concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.
Exception in thread Thread-1:
Traceback (most recent call last):
File "/usr/lib64/python3.6/threading.py", line 916, in _bootstrap_inner
self.run()
File "/usr/lib64/python3.6/threading.py", line 864, in run
self._target(*self._args, **self._kwargs)
File "/usr/lib64/python3.6/concurrent/futures/process.py", line 295, in _queue_management_worker
shutdown_worker()
File "/usr/lib64/python3.6/concurrent/futures/process.py", line 253, in shutdown_worker
call_queue.put_nowait(None)
File "/usr/lib64/python3.6/multiprocessing/queues.py", line 129, in put_nowait
return self.put(obj, False)
File "/usr/lib64/python3.6/multiprocessing/queues.py", line 83, in put
raise Full
queue.Full
更具体地说,这里是 queues.py 中引发 queue.Full 错误的代码片段。
def put(self, obj, block=True, timeout=None):
assert not self._closed, "Queue {0!r} is closed".format(self)
if not self._sem.acquire(block, timeout):
raise Full
我不清楚在什么情况下会引发此错误,
- 是因为信号量不足吗?
- 是由于快速生产者和慢消费者的场景导致队列被填满吗?如果是这种情况,是否可以阻止写入队列的进程?
- 有没有办法知道队列中的可用空间?这样可以在编写器成功将元素放入队列之前对其进行限制吗?
关于如何确定性调试的任何想法或指示?