1

我们有一些围绕 Pebble 构建的并行处理代码,它已经运行了相当长一段时间,但我们似乎遇到了一些奇怪的边缘情况。

根据异常跟踪(以及提供给它的简单代码),我怀疑它实际上是 Pebble 中的一个错误,但谁知道呢。

提供进程池的代码非常简单:

pool = ProcessPool(max_workers=10, max_tasks=10)
for path in filepaths:
  try:
    future = pool.schedule(function=self.analyse_file, args(path), timeout=30)
    future.add_done_callback(self.process_result)
  exception Exception as e:
    print("Exception fired:" + e) # NOT where the exception is firing

pool.close()
pool.join()

所以本质上,我们安排一堆东西运行,关闭池然后等待池完成预定任务。注意:异常不会在调度循环中被抛出,它会在我们调用join().

这是异常堆栈跟踪:

Traceback (most recent call last):
  File "/home/user/.pyenv/versions/3.6.0/lib/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()
  File "/home/user/.pyenv/versions/3.6.0/lib/python3.6/threading.py", line 864, in run
    self._target(*self._args, **self._kwargs)
  File "/home/user/.pyenv/versions/scrapeapp/lib/python3.6/site-packages/pebble/pool/process.py", line 150, in task_scheduler_loop
    pool_manager.schedule(task)
  File "/home/user/.pyenv/versions/scrapeapp/lib/python3.6/site-packages/pebble/pool/process.py", line 198, in schedule
    self.worker_manager.dispatch(task)
  File "/home/user/.pyenv/versions/scrapeapp/lib/python3.6/site-packages/pebble/pool/process.py", line 327, in dispatch
    self.pool_channel.send(WorkerTask(task.id, task.payload))
  File "/home/user/.pyenv/versions/scrapeapp/lib/python3.6/site-packages/pebble/pool/channel.py", line 66, in send
    return self.writer.send(obj)
  File "/home/user/.pyenv/versions/3.6.0/lib/python3.6/multiprocessing/connection.py", line 206, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "/home/user/.pyenv/versions/3.6.0/lib/python3.6/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
RuntimeError: dictionary changed size during iteration

我认为这一定是一些奇怪的竞争条件,因为代码将在某些数据集上完美运行,但在另一个数据集上看似随机的点上却失败了。

我们第一次遇到问题时使用的是 pebble 4.3.1(我们从一开始就使用相同的版本),尝试升级到 4.5.0,没有任何变化。

过去有没有人遇到过与 Pebble 类似的问题?如果是这样,您的解决方法是什么?

4

0 回答 0