这是我的情况:我需要在一个集群上运行 300 个进程(它们是独立的),它们都将它们的一部分数据添加到同一个 DataFrame 中(因此它们还需要在写入之前读取文件)。他们可能需要在整个运行时多次执行此操作。
所以我尝试在包中使用写锁定文件portalocker
。但是,我遇到了一种错误,我不明白它来自哪里。
这是每个进程将写入同一文件的框架代码:
with portalocker.Lock('/path/to/file.pickle', 'rb+', timeout=120) as file:
file.seek(0)
df = pd.read_pickle(file)
# ADD A ROW TO THE DATAFRAME
# The following part might not be great,
# I'm trying to remove the old contents of the file first so I overwrite
# and not append, not sure if this is required or if there's
# a better way to do this.
file.seek(0)
file.truncate()
df.to_pickle(file)
上述工作,大部分时间。但是,我有写锁定的同时进程越多,我在pd.read_pickle(file)
舞台上得到的 EOFError 错误就越多。
EOFError: Ran out of input
回溯非常长且令人费解。
无论如何,到目前为止我的想法是,由于它有时可以工作,所以上面的代码一定很好*(尽管它可能很混乱,我不介意听到更好的方法来做同样的事情)。
但是,当我有太多进程尝试写锁定时,我怀疑文件没有时间保存或其他东西,或者至少以某种方式下一个进程还没有看到前一个进程保存的内容。
有办法解决吗?我尝试在time.sleep(0.5)
我的代码周围添加语句(在 之前read_pickle
,在 之后to_pickle
),但我认为它没有帮助。有谁知道会发生什么或知道更好的方法来做到这一点?
另请注意,我认为写锁定不会超时。我尝试对这个过程进行计时,并且我还在其中添加了一个标志来标记写锁定是否超时。虽然有 300 个进程并且它们可能正在尝试写入并改变速率,但总的来说,我估计每秒大约有 2.5 次写入,这似乎不会使系统过载,不是吗?*
*pickled DataFrame 的大小为几百 KB。