17

SQLAlchemy 是否有类似于 Django 的信号概念的东西?基本上,我想在预保存或保存一些实体对象时触发一些功能。谢谢。

编辑:我只想要在 SQLAlchemy 中等效的 django 信号。

4

5 回答 5

13

我认为您正在寻找“ORM 事件”。您可以在此处找到文档:

http://docs.sqlalchemy.org/en/latest/orm/events.html

于 2012-09-30T18:28:50.953 回答
6

您没有明确说明,您是在集成 SQLAlchemy 和 Django,还是只想要在 SQLAlchemy 中等效于 django-signals。

如果您想要等效的 Django 信号,例如 post_save、pre_save、pre_delete 等,我会向您推荐该页面,

sqlalchemy.orm.interfaces.MapperExtension

于 2009-08-03T13:09:43.037 回答
2

这是我对这个问题的看法,它使用Louie来发送信号:

dispatch.py

"""
Signals dispatching for SQLAlchemy mappers.
"""

import louie
from sqlalchemy.orm.interfaces import MapperExtension
import signals


class LouieDispatcherExtension(MapperExtension):
    """
    Dispatch signals using louie on insert, update and delete actions.
    """

    def after_insert(self, mapper, connection, instance):
        louie.send(signals.after_insert, instance.__class__,
                instance=instance)
        return super(LouieDispatcherExtension, self).after_insert(mapper,
                connection, instance)

    def after_delete(self, mapper, connection, instance):
        louie.send(signals.after_delete, instance.__class__,
                instance=instance)
        return super(LouieDispatcherExtension, self).after_delete(mapper,
                connection, instance)

    def after_update(self, mapper, connection, instance):
        louie.send(signals.after_update, instance.__class__,
                instance=instance)
        return super(LouieDispatcherExtension, self).after_update(mapper,
                connection, instance)

    def before_delete(self, mapper, connection, instance):
        louie.send(signals.before_delete, instance.__class__,
                instance=instance)
        return super(LouieDispatcherExtension, self).before_delete(mapper,
                connection, instance)

    def before_insert(self, mapper, connection, instance):
        louie.send(signals.before_insert, instance.__class__,
                instance=instance)
        return super(LouieDispatcherExtension, self).before_insert(mapper,
                connection, instance)

    def before_update(self, mapper, connection, instance):
        louie.send(signals.before_update, instance.__class__,
                instance=instance)
        return super(LouieDispatcherExtension, self).before_update(mapper,
                connection, instance)

signals.py

from louie import Signal


class after_delete(Signal): pass
class after_insert(Signal): pass
class after_update(Signal): pass
class before_delete(Signal): pass
class before_insert(Signal): pass
class before_update(Signal): pass

示例用法:

class MyModel(DeclarativeBase):

    __mapper_args__ = {"extension": LouieDispatcherExtension()}

    ID = Column(Integer, primary_key=True)
    name = Column(String(255))

def on_insert(instance):
    print "inserted %s" % instance

louie.connect(on_insert, signals.after_insert, MyModel)
于 2010-12-27T15:38:22.963 回答
2

您可能还需要考虑sqlalchemy.orm.SessionExtension

这是我拼凑在一起的一些代码,用于在实例上设置所有者 ID 并设置 update_date 以在 pylons 应用程序中完成工作。OrmExt 类是所有魔法发生的地方。init_model 是你连接它的地方。

import logging
import sqlalchemy as sa
from sqlalchemy import orm

from pylons import session

import datetime

log = logging.getLogger(__name__)

class ORMSecurityException(Exception):
    '''
    thrown for security violations in orm layer
    '''
    pass

def _get_current_user():
    log.debug('getting current user from session...')
    log.debug(session)
    return session['user'] 

def _is_admin(user):
    return False  


def set_update_date(instance):

    if hasattr(instance,'update_date'):
    instance.update_date = datetime.datetime.now()

def set_owner(instance):
    '''
    if owner_id, run it through the rules
    '''
    log.info('set_owner')
    if hasattr(instance, 'owner_id'):
    log.info('instance.owner_id=%s' % instance.owner_id)
    u = _get_current_user()
    log.debug('user: %s' % u.email)
    if not u:
        #anonymous users can't save owned objects
        raise ORMSecurityException()
    if instance.owner_id==None:
        #must be new object thus, owned by current user
        log.info('setting owner on object %s for user: %s' % (instance.__class__.__name__,u.email))
        instance.owner_id = u.id
    elif instance.owner_id!=u.id and not _is_admin(u):
        #if owner_id does not match user_id and user is not admin VIOLATION
        raise ORMSecurityException()
    else:
        log.info('object is already owned by this user')
        return #good to go
else:
    log.info('%s is not an owned object' % instance.__class__.__name__)
    return

def instance_policy(instance):
    log.info('setting owner for %s' % instance.__class__.__name__)
    set_owner(instance)
    log.info('setting update_date for %s' % instance.__class__.__name__)
    set_update_date(instance)


class ORMExt(orm.SessionExtension):
    '''
    attempt at managing ownership logic on objects
    '''
    def __init__(self,policy):
        self._policy = policy

    def before_flush(self,sqlsess,flush_context,instances):
        '''
        check all instances for owner_id==user.id
        '''
        try:
            for instance in sqlsess.deleted:
                try:
                    log.info('running policy for deleted %s' % instance.__class__.__name__)
                    self._policy(instance)
                except Exception,ex:
                    log.error(ex)
                    raise ex

            for instance in sqlsess.new:
                try:
                    log.info('running policy for new %s' % instance.__class__.__name__)
                    self._policy(instance)
                except Exception,ex:
                    log.error(ex)
                    raise ex

            for instance in sqlsess.dirty:
                try:
                    if sqlsess.is_modified(instance,include_collections=False,passive=True):
                        log.info('running policy for updated %s' % instance.__class__.__name__)
                        self._policy(instance)
                except Exception, ex:
                    log.error(ex)
                    raise ex

        except Exception,ex:
            sqlsess.expunge_all()
            raise ex

def init_model(engine):
    """Call me before using any of the tables or classes in the model"""
    sm = orm.sessionmaker(autoflush=True, autocommit=True, bind=engine,extension=ORMExt(instance_policy))
    meta.engine = engine
    meta.Session = orm.scoped_session(sm)
于 2009-08-03T15:22:57.493 回答
0

您可以使用内部MapperExtension类:

class YourModel(db.Model):

    class BaseExtension(MapperExtension):

        def before_insert(self, mapper, connection, instance):
            # do something here

        def before_update(self, mapper, connection, instance):
            # do something here

    __mapper_args__ = { 'extension': BaseExtension() }

    # ....
于 2013-12-19T09:37:36.283 回答