SQLAlchemy 是否有类似于 Django 的信号概念的东西?基本上,我想在预保存或保存一些实体对象时触发一些功能。谢谢。
编辑:我只想要在 SQLAlchemy 中等效的 django 信号。
SQLAlchemy 是否有类似于 Django 的信号概念的东西?基本上,我想在预保存或保存一些实体对象时触发一些功能。谢谢。
编辑:我只想要在 SQLAlchemy 中等效的 django 信号。
我认为您正在寻找“ORM 事件”。您可以在此处找到文档:
您没有明确说明,您是在集成 SQLAlchemy 和 Django,还是只想要在 SQLAlchemy 中等效于 django-signals。
如果您想要等效的 Django 信号,例如 post_save、pre_save、pre_delete 等,我会向您推荐该页面,
这是我对这个问题的看法,它使用Louie来发送信号:
dispatch.py
"""
Signals dispatching for SQLAlchemy mappers.
"""
import louie
from sqlalchemy.orm.interfaces import MapperExtension
import signals
class LouieDispatcherExtension(MapperExtension):
"""
Dispatch signals using louie on insert, update and delete actions.
"""
def after_insert(self, mapper, connection, instance):
louie.send(signals.after_insert, instance.__class__,
instance=instance)
return super(LouieDispatcherExtension, self).after_insert(mapper,
connection, instance)
def after_delete(self, mapper, connection, instance):
louie.send(signals.after_delete, instance.__class__,
instance=instance)
return super(LouieDispatcherExtension, self).after_delete(mapper,
connection, instance)
def after_update(self, mapper, connection, instance):
louie.send(signals.after_update, instance.__class__,
instance=instance)
return super(LouieDispatcherExtension, self).after_update(mapper,
connection, instance)
def before_delete(self, mapper, connection, instance):
louie.send(signals.before_delete, instance.__class__,
instance=instance)
return super(LouieDispatcherExtension, self).before_delete(mapper,
connection, instance)
def before_insert(self, mapper, connection, instance):
louie.send(signals.before_insert, instance.__class__,
instance=instance)
return super(LouieDispatcherExtension, self).before_insert(mapper,
connection, instance)
def before_update(self, mapper, connection, instance):
louie.send(signals.before_update, instance.__class__,
instance=instance)
return super(LouieDispatcherExtension, self).before_update(mapper,
connection, instance)
signals.py
from louie import Signal
class after_delete(Signal): pass
class after_insert(Signal): pass
class after_update(Signal): pass
class before_delete(Signal): pass
class before_insert(Signal): pass
class before_update(Signal): pass
示例用法:
class MyModel(DeclarativeBase):
__mapper_args__ = {"extension": LouieDispatcherExtension()}
ID = Column(Integer, primary_key=True)
name = Column(String(255))
def on_insert(instance):
print "inserted %s" % instance
louie.connect(on_insert, signals.after_insert, MyModel)
您可能还需要考虑sqlalchemy.orm.SessionExtension
这是我拼凑在一起的一些代码,用于在实例上设置所有者 ID 并设置 update_date 以在 pylons 应用程序中完成工作。OrmExt 类是所有魔法发生的地方。init_model 是你连接它的地方。
import logging
import sqlalchemy as sa
from sqlalchemy import orm
from pylons import session
import datetime
log = logging.getLogger(__name__)
class ORMSecurityException(Exception):
'''
thrown for security violations in orm layer
'''
pass
def _get_current_user():
log.debug('getting current user from session...')
log.debug(session)
return session['user']
def _is_admin(user):
return False
def set_update_date(instance):
if hasattr(instance,'update_date'):
instance.update_date = datetime.datetime.now()
def set_owner(instance):
'''
if owner_id, run it through the rules
'''
log.info('set_owner')
if hasattr(instance, 'owner_id'):
log.info('instance.owner_id=%s' % instance.owner_id)
u = _get_current_user()
log.debug('user: %s' % u.email)
if not u:
#anonymous users can't save owned objects
raise ORMSecurityException()
if instance.owner_id==None:
#must be new object thus, owned by current user
log.info('setting owner on object %s for user: %s' % (instance.__class__.__name__,u.email))
instance.owner_id = u.id
elif instance.owner_id!=u.id and not _is_admin(u):
#if owner_id does not match user_id and user is not admin VIOLATION
raise ORMSecurityException()
else:
log.info('object is already owned by this user')
return #good to go
else:
log.info('%s is not an owned object' % instance.__class__.__name__)
return
def instance_policy(instance):
log.info('setting owner for %s' % instance.__class__.__name__)
set_owner(instance)
log.info('setting update_date for %s' % instance.__class__.__name__)
set_update_date(instance)
class ORMExt(orm.SessionExtension):
'''
attempt at managing ownership logic on objects
'''
def __init__(self,policy):
self._policy = policy
def before_flush(self,sqlsess,flush_context,instances):
'''
check all instances for owner_id==user.id
'''
try:
for instance in sqlsess.deleted:
try:
log.info('running policy for deleted %s' % instance.__class__.__name__)
self._policy(instance)
except Exception,ex:
log.error(ex)
raise ex
for instance in sqlsess.new:
try:
log.info('running policy for new %s' % instance.__class__.__name__)
self._policy(instance)
except Exception,ex:
log.error(ex)
raise ex
for instance in sqlsess.dirty:
try:
if sqlsess.is_modified(instance,include_collections=False,passive=True):
log.info('running policy for updated %s' % instance.__class__.__name__)
self._policy(instance)
except Exception, ex:
log.error(ex)
raise ex
except Exception,ex:
sqlsess.expunge_all()
raise ex
def init_model(engine):
"""Call me before using any of the tables or classes in the model"""
sm = orm.sessionmaker(autoflush=True, autocommit=True, bind=engine,extension=ORMExt(instance_policy))
meta.engine = engine
meta.Session = orm.scoped_session(sm)
您可以使用内部MapperExtension
类:
class YourModel(db.Model):
class BaseExtension(MapperExtension):
def before_insert(self, mapper, connection, instance):
# do something here
def before_update(self, mapper, connection, instance):
# do something here
__mapper_args__ = { 'extension': BaseExtension() }
# ....