12

我有一个昂贵的函数,它接受并返回少量数据(一些整数和浮点数)。我已经记住了这个功能,但我想让备忘录持久化。已经有几个线程与此相关,但我不确定某些建议方法的潜在问题,并且我有一些相当具体的要求:

  • 我肯定会同时使用来自多个线程和进程的函数(使用multiprocessing和来自单独的 python 脚本)
  • 我不需要从这个 python 函数之外读取或写入备忘录
  • 我并不担心备忘录在极少数情况下被损坏(比如拔掉插头或意外写入文件而不锁定它),因为重建并不昂贵(通常 10-20 分钟),但我更喜欢它不会因为异常或手动终止 python 进程而损坏(我不知道这有多现实)
  • 我非常喜欢不需要大型外部库的解决方案,因为我在一台机器上的硬盘空间非常有限,我将在其上运行代码
  • 我对跨平台代码的偏好很弱,但我可能只会在 Linux 上使用它

该线程讨论了该shelve模块,该模块显然不是进程安全的。其中两个答案建议使用fcntl.flock锁定搁置文件。然而,这个线程中的一些回复似乎表明这充满了问题——但我不确定它们是什么。听起来好像这仅限于 Unix(尽管显然 Windows 有一个等效的称为msvcrt.locking),并且锁定只是“建议”-即,它不会阻止我在不检查文件是否已锁定的情况下意外写入文件。还有其他潜在的问题吗?写入文件副本并在最后一步替换主副本会降低损坏风险吗?

看起来dbm 模块不会比搁置更好。我已经快速浏览了一下sqlite3,但为此目的似乎有点矫枉过正。 这个线程这个线程提到了几个 3rd 方库,包括ZODB,但是有很多选择,而且对于这项任务来说,它们似乎都过于庞大和复杂。

有人有建议吗?

更新:下面都提到了 IncPy,看起来确实很有趣。不幸的是,我不想回到 Python 2.6(我实际上使用的是 3.2),而且与 C 库一起使用看起来有点尴尬(我大量使用 numpy 和 scipy 等)。

kindall 的另一个想法很有启发性,但我认为将其应用于多个进程会有点困难——我想用文件锁定或数据库替换队列是最简单的。

再次查看 ZODB,它看起来确实非常适合这项任务,但我确实想避免使用任何额外的库。我仍然不完全确定简单使用的所有问题是什么flock- 我想一个大问题是如果一个进程在写入文件时终止,或者在释放锁之前?

所以,我接受了 synthesizerpatel 的建议并选择了 sqlite3。如果有人感兴趣,我决定做一个替代品dict,将其条目作为泡菜存储在数据库中(我不会费心将任何内容保存在内存中,因为数据库访问和酸洗与我正在做的其他事情相比足够快)。我确信有更有效的方法可以做到这一点(我不知道我是否仍然存在并发问题),但这里是代码:

from collections import MutableMapping
import sqlite3
import pickle


class PersistentDict(MutableMapping):
    def __init__(self, dbpath, iterable=None, **kwargs):
        self.dbpath = dbpath
        with self.get_connection() as connection:
            cursor = connection.cursor()
            cursor.execute(
                'create table if not exists memo '
                '(key blob primary key not null, value blob not null)'
            )
        if iterable is not None:
            self.update(iterable)
        self.update(kwargs)

    def encode(self, obj):
        return pickle.dumps(obj)

    def decode(self, blob):
        return pickle.loads(blob)

    def get_connection(self):
        return sqlite3.connect(self.dbpath)

    def  __getitem__(self, key):
        key = self.encode(key)
        with self.get_connection() as connection:
            cursor = connection.cursor()
            cursor.execute(
                'select value from memo where key=?',
                (key,)
            )
            value = cursor.fetchone()
        if value is None:
            raise KeyError(key)
        return self.decode(value[0])

    def __setitem__(self, key, value):
        key = self.encode(key)
        value = self.encode(value)
        with self.get_connection() as connection:
            cursor = connection.cursor()
            cursor.execute(
                'insert or replace into memo values (?, ?)',
                (key, value)
            )

    def __delitem__(self, key):
        key = self.encode(key)
        with self.get_connection() as connection:
            cursor = connection.cursor()
            cursor.execute(
                'select count(*) from memo where key=?',
                (key,)
            )
            if cursor.fetchone()[0] == 0:
                raise KeyError(key)
            cursor.execute(
                'delete from memo where key=?',
                (key,)
            )

    def __iter__(self):
        with self.get_connection() as connection:
            cursor = connection.cursor()
            cursor.execute(
                'select key from memo'
            )
            records = cursor.fetchall()
        for r in records:
            yield self.decode(r[0])

    def __len__(self):
        with self.get_connection() as connection:
            cursor = connection.cursor()
            cursor.execute(
                'select count(*) from memo'
            )
            return cursor.fetchone()[0]
4

2 回答 2

7

sqlite3 开箱即用提供ACID。文件锁定容易出现使用 sqlite3 不会遇到的竞争条件和并发问题。

基本上,是的,sqlite3 超出了您的需要,但这并不是一个巨大的负担。它可以在手机上运行,​​所以它不像你承诺运行一些野兽般的软件。这将节省您重新发明轮子和调试锁定问题的时间。

于 2012-02-16T23:22:52.223 回答
6

我假设您想继续在 RAM 中记忆函数的结果,可能在字典中,但使用持久性来减少应用程序的“预热”时间。在这种情况下,您不会直接在后备存储中随机访问项目,因此数据库可能确实是多余的(尽管正如synthesizerpatel所指出的那样,可能没有您想象的那么多)。

尽管如此,如果你想自己动手,一个可行的策略可能是在开始任何线程之前简单地从文件中加载字典。当结果不在字典中时,您需要在将其添加到字典后将其写入文件。您可以通过将其添加到队列并使用单个工作线程将队列中的项目刷新到磁盘来做到这一点(只需将它们附加到单个文件就可以了)。您可能偶尔会多次添加相同的结果,但这并不是致命的,因为每次都会得到相同的结果,因此将其读回两次或更多次不会造成真正的伤害。Python 的线程模型将使您远离大多数并发问题(例如,附加到列表是原子的)。

这是一些(未经测试的,通用的,不完整的)代码,显示了我在说什么:

import cPickle as pickle

import time, os.path

cache = {}
queue = []

# run at script start to warm up cache
def preload_cache(filename):
    if os.path.isfile(filename):
        with open(filename, "rb") as f:
            while True:
                try:
                    key, value = pickle.load(f), pickle.load(f)
                except EOFError:
                    break
                cache[key] = value

# your memoized function
def time_consuming_function(a, b, c, d):
    key = (a, b, c, d)
    if key in cache:
        return cache[key]
    else:
        # generate the result here
        # ...
        # add to cache, checking to see if it's already there again to avoid writing
        # it twice (in case another thread also added it) (this is not fatal, though)
        if key not in cache:
            cache[key] = result
            queue.append((key, result))
        return result

# run on worker thread to write new items out
def write_cache(filename):
    with open(filename, "ab") as f:
        while True:
            while queue:
                key, value = queue.pop()  # item order not important
                # but must write key and value in single call to ensure
                # both get written (otherwise, interrupting script might
                # leave only one written, corrupting the file)
                f.write(pickle.dumps(key, pickle.HIGHEST_PROTOCOL) +
                        pickle.dumps(value, pickle.HIGHEST_PROTOCOL))
            f.flush()
            time.sleep(1)

如果我有时间,我会把它变成一个装饰器......并将持久性放入一个dict子类......全局变量的使用也是次优的。:-) 如果您使用这种方法,multiprocessing您可能希望使用 amultiprocessing.Queue而不是列表;然后,您可以queue.get()在写入文件的工作进程中用作阻塞等待新结果。不过,我还没有使用过multiprocessing,所以请对这一点建议持保留态度。

于 2012-02-17T00:13:44.573 回答