3

我的问题如下:

我有这两个模型Entry,并Tag通过 SQLAlchemy 中的多对多关系链接。现在我想删除删除后Tag没有任何对应的所有内容。EntryEntry

举例说明我想要什么:

  • Entry 1带标签pythonjava

  • Entry 2带标签pythonc++

有了这两个条目,数据库就包含了标签pythonjavac++。如果我现在删除Entry 2,我希望 SQLAlchemy 自动c++从数据库中删除标签。是否可以在Entry模型本身中定义这种行为,或者是否有更优雅的方式?

谢谢。

4

2 回答 2

3

不久前有人问过这个问题:在 SQLAlchemy 关系上设置 delete-orphan 会导致 AssertionError: This AttributeImpl is not configured to track parents

这就是“多对多孤儿”问题。jadkik94 很接近,因为您应该使用事件来捕获它,但我尝试建议不要在映射器事件中使用 Session,尽管它在这种情况下有效。

下面,我从另一个 SO 问题中逐字回答,并将“角色”一词替换为“条目”:

from sqlalchemy.ext.associationproxy import association_proxy
from sqlalchemy import *
from sqlalchemy.orm import *
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import event
from sqlalchemy.orm import attributes

Base= declarative_base()

tagging = Table('tagging',Base.metadata,
    Column('tag_id', Integer, ForeignKey('tag.id', ondelete='cascade'), primary_key=True),
    Column('entry_id', Integer, ForeignKey('entry.id', ondelete='cascade'), primary_key=True)
)

class Tag(Base):

    __tablename__ = 'tag'
    id = Column(Integer, primary_key=True)
    name = Column(String(100), unique=True, nullable=False)

    def __init__(self, name=None):
        self.name = name

class Entry(Base):
    __tablename__ = 'entry'

    id = Column(Integer, primary_key=True)
    tag_names = association_proxy('tags', 'name')

    tags = relationship('Tag',
                        secondary=tagging,
                        backref='entries')

@event.listens_for(Session, 'after_flush')
def delete_tag_orphans(session, ctx):
    # optional: look through Session state to see if we want
    # to emit a DELETE for orphan Tags
    flag = False

    for instance in session.dirty:
        if isinstance(instance, Entry) and \
            attributes.get_history(instance, 'tags').deleted:
            flag = True
            break
    for instance in session.deleted:
        if isinstance(instance, Entry):
            flag = True
            break

    # emit a DELETE for all orphan Tags.   This is safe to emit
    # regardless of "flag", if a less verbose approach is
    # desired.
    if flag:
        session.query(Tag).\
            filter(~Tag.entries.any()).\
            delete(synchronize_session=False)


e = create_engine("sqlite://", echo=True)

Base.metadata.create_all(e)

s = Session(e)

r1 = Entry()
r2 = Entry()
r3 = Entry()
t1, t2, t3, t4 = Tag("t1"), Tag("t2"), Tag("t3"), Tag("t4")

r1.tags.extend([t1, t2])
r2.tags.extend([t2, t3])
r3.tags.extend([t4])
s.add_all([r1, r2, r3])

assert s.query(Tag).count() == 4

r2.tags.remove(t2)

assert s.query(Tag).count() == 4

r1.tags.remove(t2)

assert s.query(Tag).count() == 3

r1.tags.remove(t1)

assert s.query(Tag).count() == 2

两个几乎相同的 SO 问题使这成为手头上的东西,所以我将它添加到http://www.sqlalchemy.org/trac/wiki/UsageRecipes/ManyToManyOrphan的 wiki 中。

于 2012-10-04T16:01:31.543 回答
1

我会让代码为我说话:

from sqlalchemy import create_engine, exc, event
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import func, Table, Column, Integer, String, Float, Boolean, MetaData, ForeignKey
from sqlalchemy.orm import relationship, backref

# Connection
engine = create_engine('sqlite:///', echo=True)
Base = declarative_base(bind=engine)
Session = sessionmaker(bind=engine)

# Models
entry_tag_link = Table('entry_tag', Base.metadata,
    Column('entry_id', Integer, ForeignKey('entries.id')),
    Column('tag_id', Integer, ForeignKey('tags.id'))
)

class Entry(Base):
    __tablename__ = 'entries'
    id = Column(Integer, primary_key=True)
    name = Column(String(255), nullable=False, default='')

    tags = relationship("Tag", secondary=entry_tag_link, backref="entries")

    def __repr__(self):
        return '<Entry %s>' % (self.name,)

class Tag(Base):
    __tablename__ = 'tags'
    id = Column(Integer, primary_key=True)
    name = Column(String(255), nullable=False)

    def __repr__(self):
        return '<Tag %s>' % (self.name,)

# Delete listener
def delete_listener(mapper, connection, target):
    print "---- DELETING %s ----" % (target,)
    print '-' * 20
    for t in target.tags:
        if len(t.entries) == 0:
            print ' ' * 5, t, 'is to be deleted'
            session.delete(t)
    print '-' * 20

event.listen(Entry, 'before_delete', delete_listener)

# Utility functions
def dump(session):
    entries = session.query(Entry).all()
    tags = session.query(Tag).all()

    print '*' * 20
    print 'Entries', entries
    print 'Tags', tags
    print '*' * 20


Base.metadata.create_all()

session = Session()

t1, t2, t3 = Tag(name='python'), Tag(name='java'), Tag(name='c++')

e1, e2 = Entry(name='Entry 1', tags=[t1, t2]), Entry(name='Entry 2', tags=[t1, t3])

session.add_all([e1,e2])
session.commit()

dump(session)

raw_input("---- Press return to delete the second entry and see the result ----")

session.delete(e2)
session.commit()

dump(session)

上面的代码使用 SQLAlchemy ORM 事件的after_delete事件。这条线很神奇:

event.listen(Entry, 'before_delete', delete_listener)

这表示要监听一个Entry项目的所有删除,并调用我们的监听器,它会做我们想要的。但是,文档不建议更改事件中的会话(请参阅我添加的链接中的警告)。但据我所知,它有效,所以由您决定这是否适合您。

于 2012-09-29T18:01:57.083 回答