0

对于数据提取作业,我使用 ProcessPoolExecutor 从存储中提取数据,一旦提取达到内存中记录的阈值数量,提取的数据集通过 ProcessPoolExecutor 移交给另一个进程,然后主进程/迭代器继续获取记录。通过 ProcessPoolExecutor 生成的进程将转换、过滤接收到的数据集中的记录。

在处理较大的数据集时,我经常会经常收到以下 BrokenProcessPool。以此为提示(更大的数据集),调整相关参数,这个错误会延迟一点,但最终会抛出错误。由于缺乏信息,它仍然是一个不确定的谜题,无法确定它何时/哪个点会再次发生。

我确实查看了其他问题,但无法将其用作信息。

concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.
Exception in thread Thread-1:
Traceback (most recent call last):
  File "/usr/lib64/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()
  File "/usr/lib64/python3.6/threading.py", line 864, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib64/python3.6/concurrent/futures/process.py", line 295, in _queue_management_worker
    shutdown_worker()
  File "/usr/lib64/python3.6/concurrent/futures/process.py", line 253, in shutdown_worker
    call_queue.put_nowait(None)
  File "/usr/lib64/python3.6/multiprocessing/queues.py", line 129, in put_nowait
    return self.put(obj, False)
  File "/usr/lib64/python3.6/multiprocessing/queues.py", line 83, in put
    raise Full
queue.Full

更具体地说,这里是 queues.py 中引发 queue.Full 错误的代码片段。

def put(self, obj, block=True, timeout=None):
    assert not self._closed, "Queue {0!r} is closed".format(self)
    if not self._sem.acquire(block, timeout):
        raise Full

我不清楚在什么情况下会引发此错误,

  1. 是因为信号量不足吗?
  2. 是由于快速生产者和慢消费者的场景导致队列被填满吗?如果是这种情况,是否可以阻止写入队列的进程?
  3. 有没有办法知道队列中的可用空间?这样可以在编写器成功将元素放入队列之前对其进行限制吗?

关于如何确定性调试的任何想法或指示?

4

0 回答 0