我们有一些围绕 Pebble 构建的并行处理代码,它已经运行了相当长一段时间,但我们似乎遇到了一些奇怪的边缘情况。
根据异常跟踪(以及提供给它的简单代码),我怀疑它实际上是 Pebble 中的一个错误,但谁知道呢。
提供进程池的代码非常简单:
pool = ProcessPool(max_workers=10, max_tasks=10)
for path in filepaths:
try:
future = pool.schedule(function=self.analyse_file, args(path), timeout=30)
future.add_done_callback(self.process_result)
exception Exception as e:
print("Exception fired:" + e) # NOT where the exception is firing
pool.close()
pool.join()
所以本质上,我们安排一堆东西运行,关闭池然后等待池完成预定任务。注意:异常不会在调度循环中被抛出,它会在我们调用join()
.
这是异常堆栈跟踪:
Traceback (most recent call last):
File "/home/user/.pyenv/versions/3.6.0/lib/python3.6/threading.py", line 916, in _bootstrap_inner
self.run()
File "/home/user/.pyenv/versions/3.6.0/lib/python3.6/threading.py", line 864, in run
self._target(*self._args, **self._kwargs)
File "/home/user/.pyenv/versions/scrapeapp/lib/python3.6/site-packages/pebble/pool/process.py", line 150, in task_scheduler_loop
pool_manager.schedule(task)
File "/home/user/.pyenv/versions/scrapeapp/lib/python3.6/site-packages/pebble/pool/process.py", line 198, in schedule
self.worker_manager.dispatch(task)
File "/home/user/.pyenv/versions/scrapeapp/lib/python3.6/site-packages/pebble/pool/process.py", line 327, in dispatch
self.pool_channel.send(WorkerTask(task.id, task.payload))
File "/home/user/.pyenv/versions/scrapeapp/lib/python3.6/site-packages/pebble/pool/channel.py", line 66, in send
return self.writer.send(obj)
File "/home/user/.pyenv/versions/3.6.0/lib/python3.6/multiprocessing/connection.py", line 206, in send
self._send_bytes(_ForkingPickler.dumps(obj))
File "/home/user/.pyenv/versions/3.6.0/lib/python3.6/multiprocessing/reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
RuntimeError: dictionary changed size during iteration
我认为这一定是一些奇怪的竞争条件,因为代码将在某些数据集上完美运行,但在另一个数据集上看似随机的点上却失败了。
我们第一次遇到问题时使用的是 pebble 4.3.1(我们从一开始就使用相同的版本),尝试升级到 4.5.0,没有任何变化。
过去有没有人遇到过与 Pebble 类似的问题?如果是这样,您的解决方法是什么?