很多人都做“is_deleted”的事情,我同意我也不喜欢这样做,尽管我们在PreFilteredQuery确实有一个秘诀。
正如其他人所建议的那样,您正在寻找的是“版本控制”配方。我们有一个在 SQLAlchemy 文档的版本化对象中介绍的单独版本化表中存储数据副本的综合示例。
在这里,我调整了该示例中使用的一些技术,以生成一个更直接的配方,该配方专门仅跟踪“已删除”对象,并包括一个“恢复”功能,它将给定行“恢复”回主表。所以它不像“SQLAlchemy 允许我切换与特定对象关联的表”,它更像是创建了另一个映射类,它类似于主类,它也可以用来“反转”你请求的删除. 线下的所有内容__main__
都是概念验证。
from sqlalchemy.orm import Session, object_session
from sqlalchemy import event
def preserve_deleted(class_):
def copy_col(col):
newcol = col.copy()
newcol.constraints = set()
return newcol
keys = class_.__table__.c.keys()
cols = dict(
(col.key, copy_col(col)) for col in class_.__table__.c
)
cols['__tablename__'] = "%s_deleted" % class_.__table__.name
class History(object):
def restore(self):
sess = object_session(self)
sess.delete(self)
sess.add(copy_inst(self, class_))
hist_class = type(
'%sDeleted' % class_.__name__,
(History, Base),
cols)
def copy_inst(fromobj, tocls):
return tocls(**dict(
(key, getattr(fromobj, key))
for key in keys
))
@event.listens_for(Session, 'before_flush')
def check_deleted(session, flush_context, instances):
for del_ in session.deleted:
if isinstance(del_, class_):
h = copy_inst(del_, hist_class)
session.add(h)
class_.deleted = hist_class
return class_
if __name__ == '__main__':
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, ForeignKey, Integer, String
from sqlalchemy.orm import relationship, Session
from sqlalchemy import create_engine
Base = declarative_base()
@preserve_deleted
class A(Base):
__tablename__ = "a"
id = Column(Integer, primary_key=True)
data1 = Column(String)
data2 = Column(String)
@preserve_deleted
class B(Base):
__tablename__ = 'b'
id = Column(Integer, primary_key=True)
data1 = Column(String)
a_id = Column(Integer, ForeignKey('a.id'))
a = relationship("A")
e = create_engine('sqlite://', echo=True)
Base.metadata.create_all(e)
s = Session(e)
a1, a2, a3, a4 = \
A(data1='a1d1', data2='a1d2'),\
A(data1='a2d1', data2='a2d2'),\
A(data1='a3d1', data2='a3d2'),\
A(data1='a4d1', data2='a4d2')
b1, b2, b3, b4 = \
B(data1='b1', a=a1),\
B(data1='b2', a=a1),\
B(data1='b3', a=a3),\
B(data1='b4', a=a4)
s.add_all([
a1, a2, a3, a4,
b1, b2, b3, b4
])
s.commit()
assert s.query(A.id).order_by(A.id).all() == [(1, ), (2, ), (3, ), (4, )]
assert s.query(B.id).order_by(B.id).all() == [(1, ), (2, ), (3, ), (4, )]
s.delete(a2)
s.delete(b2)
s.delete(b3)
s.delete(a3)
s.commit()
assert s.query(A.id).order_by(A.id).all() == [(1, ), (4, )]
assert s.query(B.id).order_by(B.id).all() == [(1, ), (4, )]
a2_deleted = s.query(A.deleted).filter(A.deleted.id == 2).one()
a2_deleted.restore()
b3_deleted = s.query(B.deleted).filter(B.deleted.id == 3).one()
a3_deleted = s.query(A.deleted).filter(A.deleted.id == 3).one()
b3_deleted.restore()
a3_deleted.restore()
s.commit()
assert s.query(A.id).order_by(A.id).all() == [(1, ), (2, ), (3, ), (4, )]
assert s.query(B.id).order_by(B.id).all() == [(1, ), (3, ), (4, )]