1

这是我的情况:我需要在一个集群上运行 300 个进程(它们是独立的),它们都它们的一部分数据添加到同一个 DataFrame 中(因此它们还需要在写入之前读取文件)。他们可能需要在整个运行时多次执行此操作。

所以我尝试在包中使用写锁定文件portalocker。但是,我遇到了一种错误,我不明白它来自哪里。

这是每个进程将写入同一文件的框架代码:

with portalocker.Lock('/path/to/file.pickle', 'rb+', timeout=120) as file:
    file.seek(0)
    df = pd.read_pickle(file)

    # ADD A ROW TO THE DATAFRAME

    # The following part might not be great,
    # I'm trying to remove the old contents of the file first so I overwrite
    # and not append, not sure if this is required or if there's
    # a better way to do this.
    file.seek(0)
    file.truncate()
    df.to_pickle(file)

上述工作,大部分时间。但是,我有写锁定的同时进程越多,我在pd.read_pickle(file)舞台上得到的 EOFError 错误就越多。

EOFError: Ran out of input

回溯非常长且令人费解。

无论如何,到目前为止我的想法是,由于它有时可以工作,所以上面的代码一定很好*(尽管它可能很混乱,我不介意听到更好的方法来做同样的事情)。

但是,当我有太多进程尝试写锁定时,我怀疑文件没有时间保存或其他东西,或者至少以某种方式下一个进程还没有看到前一个进程保存的内容。

有办法解决吗?我尝试在time.sleep(0.5)我的代码周围添加语句(在 之前read_pickle,在 之后to_pickle),但我认为它没有帮助。有谁知道会发生什么或知道更好的方法来做到这一点?

另请注意,我认为写锁定不会超时。我尝试对这个过程进行计时,并且我还在其中添加了一个标志来标记写锁定是否超时。虽然有 300 个进程并且它们可能正在尝试写入并改变速率,但总的来说,我估计每秒大约有 2.5 次写入,这似乎不会使系统过载,不是吗?*

*pickled DataFrame 的大小为几百 KB。

4

1 回答 1

0

这可能是一个有点模糊的问题,但由于我设法解决了它并更好地了解了情况,我认为在这里发布我发现的内容会很有帮助,希望其他人也能找到这个。在我看来,这似乎是一个可能会出现在其他人身上的问题。

感谢“portallocker” github 页面的帮助和回答:https ://github.com/WoLpH/portallocker/issues/40

问题似乎是我在集群上执行此操作。结果,文件系统有点复杂,因为进程在多个节点上运行。保存的文件“同步”并被不同的运行进程看到可能需要比预期更长的时间。

似乎对我有用的是刷新文件并强制系统同步,此外(不确定这是否是可选的),然后添加更长的“time.sleep()”。

根据“portallocker”开发人员的说法,文件在集群上同步可能需要不可预测的时间,因此您可能需要改变睡眠时间。

换句话说,在保存文件后添加:

df.to_pickle(file)
file.flush()
os.fsync(file.fileno())

time.sleep(1)

希望这可以帮助其他人。

于 2018-10-30T15:20:56.827 回答