6

由于需要处理,我想要以多线程方式生成成千上万个(可能是数百个)持久对象。

虽然对象的创建发生在单独的线程中(使用带有作用域会话的 Flask-SQLAlchemy 扩展顺便说一句),但将生成的对象写入数据库的调用发生在生成完成后的 1 个位置。

我认为,问题在于正在创建的对象是几个现有关系的一部分——从而触发了身份映射的自动添加,尽管是在单独的并发线程中创建的,并且在任何线程中都没有显式会话。

我希望将生成的对象包含在一个列表中,然后将整个列表(使用单个会话对象)写入数据库。这会导致如下错误:

AssertionError: A conflicting state is already present in the identity map for key (<class 'app.ModelObject'>, (1L,))

因此,为什么我相信身份映射已经被填充,因为当我尝试使用并发代码之外的全局会话添加和提交时,会触发断言错误。

最后的细节是无论会话对象(范围或其他,因为我不完全理解在多线程情况下自动添加到身份映射的工作原理)我找不到方法/不知道如何获得对它们的引用,这样即使我想为每个进程处理一个单独的会话,我也可以。

任何意见是极大的赞赏。我(还)没有发布代码的唯一原因是因为很难从我的应用程序中立即抽象出一个工作示例。如果有人真的需要看到它,我会发布。

4

2 回答 2

5

每个会话都是线程本地的;换句话说,每个线程都有一个单独的会话。如果您决定将某些实例传递给另一个线程,它们将与会话“分离”。在接收线程中使用db.session.add_all(objects)以将它们全部放回。

出于某种原因,您似乎正在不同线程中创建具有相同标识(主键列)的对象,然后尝试将它们都发送到数据库。一种选择是解决发生这种情况的原因,从而保证身份是唯一的。您也可以尝试合并merged_object = db.session.merge(other_object, load=False).

编辑:zzzeek 的评论让我了解可能发生的其他事情:

使用 Flask-SQLAlchemy,会话与应用上下文相关联。由于这是线程本地的,因此产生一个新线程将使上下文无效;线程中不会有数据库会话。所有实例都在那里分离,并且无法正确跟踪关系。一种解决方案是传递给每个线程并在一个块app内执行所有操作。with app.app_context():在块内,首先使用db.session.add传递的实例填充本地会话。之后您仍应合并到主任务中以确保一致性。

于 2013-06-26T13:15:17.300 回答
3

我只是想用一些伪代码来澄清问题和解决方案,以防将来有人遇到这个问题/想要这样做。

class ObjA(object):
    obj_c = relationship('ObjC', backref='obj_c')

class ObjB(object):
    obj_c = relationship('ObjC', backref='obj_c')

class ObjC(object):
    obj_a_id = Column(Integer, ForeignKey('obj_a.id'))
    obj_b_id = Column(Integer, ForeignKey('obj_b.id'))

    def __init__(self, obj_a, obj_b):
        self.obj_a = obj_a
        self.obj_b = obj_b


def make_a_bunch_of_c(obj_a, list_of_b=None):
    return [ObjC(obj_a, obj_b) for obj_b in list_of_b]

def parallel_generate():
   list_of_a = session.query(ObjA).all() # assume there are 1000 of these
   list_of_b = session.query(ObjB).all() # and 30 of these

   fxn = functools.partial(make_a_bunch_of_c, list_of_b=list_of_b)
   pool = multiprocessing.Pool(10)
   all_the_things = pool.map(fxn, list_of_a)
   return all_the_things

现在让我们在这里停一下。最初的问题是尝试添加 ObjC 列表导致原始问题中的错误消息:

session.add_all(all_the_things)

AssertionError: A conflicting state is already present in the identity map for key [...]

注意:错误发生在添加阶段,提交尝试甚至从未发生,因为断言发生在提交前。据我所知。

解决方案:

all_the_things = parallel_generate()
for thing in all_the_things:
    session.merge(thing)
session.commit()

处理自动添加的对象(通过关系级联)时会话利用的细节仍然超出我的能力范围,我无法解释最初发生冲突的原因。我所知道的是,使用合并函数将导致 SQLAlchemy 将跨 10 个不同进程创建的所有子对象排序到主进程中的单个会话中。

如果有人发生这种情况,我会很好奇为什么。

于 2013-06-28T04:33:22.763 回答