11

我正在尝试处理由以下代码生成的数据:

for Gnodes in G.nodes()       # Gnodes iterates over 10000 values 
    Gvalue = someoperation(Gnodes)
    for Hnodes in H.nodes()   # Hnodes iterates over 10000 values 
        Hvalue =someoperation(Hnodes)
        score = SomeOperation on (Gvalue,Hvalue)
        dic_score.setdefault(Gnodes,[]).append([Hnodes, score, -1 ])

由于字典很大(10000 个键 X 10000 个列表,每个包含 3 个元素),因此很难将其保存在内存中。我一直在寻找一种解决方案,它在生成键:值(以列表的形式)对后立即存储它们。这里建议,以特定格式(Python)编写和阅读字典,将 ZODB 与 Btree 结合使用。

如果这太天真了,请容忍我,我的问题是,什么时候应该调用transaction.commit()提交数据?如果我在内循环结束时调用它,则生成的文件非常大(不知道为什么)。这是一个片段:

storage = FileStorage('Data.fs')
db = DB(store)
connection = db.open()
root = connection.root()
btree_container = IOBTree
root[0] = btree_container 
for nodes in G.nodes()
    btree_container[nodes] = PersistentList () ## I was loosing data prior to doing this 

for Gnodes in G.nodes()       # Gnodes iterates over 10000 values 
    Gvalue = someoperation(Gnodes)
    for Hnodes in H.nodes()   # Hnodes iterates over 10000 values 
        Hvalue =someoperation(Hnodes)
        score = SomeOperation on (Gvalue,Hvalue)
        btree_container.setdefault(Gnodes,[]).append([Hnodes, score, -1 ])
        transaction.commit()

如果我在两个循环之外调用它怎么办?就像是:

    ......
       ......
          score = SomeOperation on (Gvalue,Hvalue)
          btree_container.setdefault(Gnodes,[]).append([Hnodes, score, -1 ])
    transaction.commit()

在我调用 transaction.commit() 之前,所有数据都会保存在内存中吗?同样,我不知道为什么,但这会导致磁盘上的文件变小。

我想最小化内存中保存的数据。任何指导将不胜感激 !

4

2 回答 2

30

您的目标是使您的流程在内存限制内可管理。为了能够将 ZODB 作为一种工具来执行此操作,您需要了解 ZODB 事务是如何工作的,以及如何使用它们。

为什么您的 ZODB 变得如此之大

首先,您需要了解事务提交在这里做了什么,这也解释了为什么您的 Data.fs 变得如此之大。

ZODB 将数据写入每个事务,其中任何已更改的持久对象都将写入磁盘。这里的重要细节是已更改的持久对象;ZODB 以持久对象为单位工作。

并非每个 python 值都是持久对象。如果我定义一个直接的 python 类,它不会是持久的,也不是任何内置的 python 类型,例如 int 或 list。另一方面,您定义的任何继承自的类persistence.Persistent 都是持久对象。类BTrees集以及PeristentList您在代码中使用的类确实继承自Persistent.

现在,在事务提交时,任何已更改的持久对象都将作为该事务的一部分写入磁盘。因此,任何PersistentList已附加到的对象都将完整地写入磁盘。BTrees处理这个更有效率;它们存储存储桶,它们本身是持久的,而存储桶又保存实际存储的对象。因此,对于您创建的每几个新节点,都会将一个 Bucket 写入事务,而不是整个 BTree 结构。请注意,因为保存在树中的项目本身就是持久对象,所以只有对它们的引用存储在 Bucket 记录中。

现在,ZODB 通过将事务数据附加到Data.fs文件来写入事务数据,并且不会自动删除旧数据。它可以通过从存储中查找给定对象的最新版本来构造数据库的当前状态。这就是为什么你Data.fs的增长如此之快,PersistentList随着事务的提交,你正在编写越来越大的实例的新版本。

删除旧数据称为打包,类似于VACUUMPostgreSQL 和其他关系数据库中的命令。只需调用变量上.pack()方法db以删除所有旧版本,或使用该方法的tanddays参数来设置要保留多少历史记录的限制,第一个是time.time()时间戳(自纪元以来的秒数),您可以在此之前打包,并且days是从当前时间或t指定的过去保留的天数。随着旧事务中的部分列表被删除,打包应该会大大减少您的数据文件。请注意,打包是一项昂贵的操作,因此可能需要一段时间,具体取决于数据集的大小。

使用事务管理内存

您正在尝试构建一个非常大的数据集,通过使用持久性来解决内存限制,并使用事务来尝试将内容刷新到磁盘。但是,通常,使用事务提交信号,您已经完成了数据集的构建,您可以将其用作一个原子整体。

您需要在这里使用的是一个保存点。保存点本质上是子事务,在整个事务期间您可以请求将数据临时存储在磁盘上的一个点。当您提交事务时,它们将成为永久性的。要创建保存点,请在事务上调用.savepoint方法

for Gnodes in G.nodes():      # Gnodes iterates over 10000 values 
    Gvalue = someoperation(Gnodes)
    for Hnodes in H.nodes():  # Hnodes iterates over 10000 values 
        Hvalue =someoperation(Hnodes)
        score = SomeOperation on (Gvalue,Hvalue)
        btree_container.setdefault(Gnodes, PersistentList()).append(
            [Hnodes, score, -1 ])
    transaction.savepoint(True)
transaction.commit()

在上面的示例中,我将optimistic标志设置为 True,这意味着:我不打算回滚到此保存点;某些存储不支持回滚,并且发出不需要回滚的信号会使您的代码在这种情况下工作。

另请注意,transaction.commit()当整个数据集已被处理时会发生这种情况,这是提交应该实现的。

保存点所做的一件事是调用 ZODB 缓存的垃圾收集,这意味着当前未使用的任何数据都将从内存中删除。

注意那里的“当前未使用”部分;如果您的任何代码在变量中保留较大的值,则无法从内存中清除数据。据我可以从您向我们展示的代码中确定,这看起来不错。但是我不知道您的操作是如何工作的,也不知道您是如何生成节点的;例如,当迭代器会这样做时,请注意避免在内存中构建完整的列表,或者构建引用所有列表列表的大型字典。

您可以对创建保存点的位置进行一些试验;您可以在每次处理完一个时创建一个HNodes,或者仅在完成GNodes我上面所做的循环时创建一个。您正在构建一个列表 per GNodes,因此它会保留在内存中,同时循环遍历所有内容H.nodes(),并且刷新到磁盘可能只有在您完成完整构建后才有意义。

但是,如果您发现需要更频繁地清除内存,则应考虑使用BTrees.OOBTree.TreeSet类或BTrees.IOBTree.BTree类而不是 aPersistentList将数据分解为更持久的对象。ATreeSet是有序的,但不是(容易)可索引的,而 aBTree可以通过使用简单的递增索引键用作列表:

for i, Hnodes in enumerate(H.nodes()):
    ...
    btree_container.setdefault(Gnodes, IOBTree())[i] = [Hnodes, score, -1]
    if i % 100 == 0:
        transaction.savepoint(True)

上面的代码使用 BTree 而不是 PersistentList,并且每HNodes处理 100 个就创建一个保存点。因为 BTree 使用本身就是持久对象的存储桶,所以可以更轻松地将整个结构刷新到保存点,而不必留在内存中以供所有H.nodes()处理。

于 2012-06-29T11:00:30.700 回答
1

什么构成事务取决于您的应用程序中需要“原子”的内容。如果事务失败,它将回滚到之前的状态(就在最后一次提交之后)。从您的应用程序代码中可以看出,您希望为每个 Gnode 计算一个值。因此,您的提交可以像这样在 Gnodes 循环结束时进入:

for Gnodes in G.nodes():       # Gnodes iterates over 10000 values 
    Gvalue = someoperation(Gnodes)
    for Hnodes in H.nodes():   # Hnodes iterates over 10000 values 
        Hvalue =someoperation(Hnodes)
        score = SomeOperation on (Gvalue,Hvalue)
        btree_container.setdefault(Gnodes,[]).append([Hnodes, score, -1 ])
    # once we calculate the value for a Gnodes, commit
    transaction.commit()

从您的代码中可以看出,“Hvalue”组合不依赖于 Gvalue 或 Gnodes。如果它是一项昂贵的操作,即使它不影响其计算,您也要为每个 Gnode 计算 1000 次。所以,我会把它移出循环。

# Hnodes iterates over 10000 values
hvals = dict((Hnodes, someoperation(Hnodes)) for Hnodes in H.nodes())
# now you have mapping of Hnodes and Hvalues

for Gnodes in G.nodes():       # Gnodes iterates over 10000 values 
    Gvalue = someoperation(Gnodes)
    for Hnodes, Hvalue in hvals.iteritems(): 
        score = SomeOperation on (Gvalue,Hvalue)
        btree_container.setdefault(Gnodes,[]).append([Hnodes, score, -1 ])
    # once we calculate the value for a given Gnodes, commit
    transaction.commit()
于 2012-06-29T06:53:43.500 回答