我只是想用一些伪代码来澄清问题和解决方案,以防将来有人遇到这个问题/想要这样做。
class ObjA(object):
obj_c = relationship('ObjC', backref='obj_c')
class ObjB(object):
obj_c = relationship('ObjC', backref='obj_c')
class ObjC(object):
obj_a_id = Column(Integer, ForeignKey('obj_a.id'))
obj_b_id = Column(Integer, ForeignKey('obj_b.id'))
def __init__(self, obj_a, obj_b):
self.obj_a = obj_a
self.obj_b = obj_b
def make_a_bunch_of_c(obj_a, list_of_b=None):
return [ObjC(obj_a, obj_b) for obj_b in list_of_b]
def parallel_generate():
list_of_a = session.query(ObjA).all() # assume there are 1000 of these
list_of_b = session.query(ObjB).all() # and 30 of these
fxn = functools.partial(make_a_bunch_of_c, list_of_b=list_of_b)
pool = multiprocessing.Pool(10)
all_the_things = pool.map(fxn, list_of_a)
return all_the_things
现在让我们在这里停一下。最初的问题是尝试添加 ObjC 列表导致原始问题中的错误消息:
session.add_all(all_the_things)
AssertionError: A conflicting state is already present in the identity map for key [...]
注意:错误发生在添加阶段,提交尝试甚至从未发生,因为断言发生在提交前。据我所知。
解决方案:
all_the_things = parallel_generate()
for thing in all_the_things:
session.merge(thing)
session.commit()
处理自动添加的对象(通过关系级联)时会话利用的细节仍然超出我的能力范围,我无法解释最初发生冲突的原因。我所知道的是,使用合并函数将导致 SQLAlchemy 将跨 10 个不同进程创建的所有子对象排序到主进程中的单个会话中。
如果有人发生这种情况,我会很好奇为什么。