12

这与另一个超过 3 年的问题非常相似:查看 SQLAlchemy 事务、完成身份验证的用户等的一般方法是什么?

我正在开发一个应用程序,我想在其中记录对特定表的所有更改。目前有一个非常好的“配方”可以进行版本控制,但我需要对其进行修改以记录更改发生的日期时间和进行更改的用户 ID。我采用了与 SQLAlchemy 打包的 history_meta.py 示例,并使其记录时间而不是版本号,但我无法弄清楚如何传递用户 ID。

我上面提到的问题建议在会话对象中包含用户 ID。这很有意义,但我不知道该怎么做。我尝试了一些简单的方法,session.userid = authenticated_userid(request)但在 history_meta.py 中,该属性似乎不再位于会话对象上。

我在 Pyramid 框架中执行所有这些操作,并且我使用的会话对象定义为DBSession = scoped_session(sessionmaker(extension=ZopeTransactionExtension())). 在一个视图中我这样做session = DBSession()然后继续使用session. (我不确定这是否有必要,但这就是正在发生的事情)

这是我修改后的 history_meta.py,以防有人发现它有用:

from sqlalchemy.ext.declarative import declared_attr
from sqlalchemy.orm import mapper, class_mapper, attributes, object_mapper
from sqlalchemy.orm.exc import UnmappedClassError, UnmappedColumnError
from sqlalchemy import Table, Column, ForeignKeyConstraint, DateTime
from sqlalchemy import event
from sqlalchemy.orm.properties import RelationshipProperty
from datetime import datetime

def col_references_table(col, table):
    for fk in col.foreign_keys:
        if fk.references(table):
            return True
    return False

def _history_mapper(local_mapper):
    cls = local_mapper.class_

    # set the "active_history" flag
    # on on column-mapped attributes so that the old version
    # of the info is always loaded (currently sets it on all attributes)
    for prop in local_mapper.iterate_properties:
        getattr(local_mapper.class_, prop.key).impl.active_history = True

    super_mapper = local_mapper.inherits
    super_history_mapper = getattr(cls, '__history_mapper__', None)

    polymorphic_on = None
    super_fks = []
    if not super_mapper or local_mapper.local_table is not super_mapper.local_table:
        cols = []
        for column in local_mapper.local_table.c:
            if column.name == 'version_datetime':
                continue

            col = column.copy()
            col.unique = False

            if super_mapper and col_references_table(column, super_mapper.local_table):
                super_fks.append((col.key, list(super_history_mapper.local_table.primary_key)[0]))

            cols.append(col)

            if column is local_mapper.polymorphic_on:
                polymorphic_on = col

        if super_mapper:
            super_fks.append(('version_datetime', super_history_mapper.base_mapper.local_table.c.version_datetime))
            cols.append(Column('version_datetime', DateTime, default=datetime.now, nullable=False, primary_key=True))
        else:
            cols.append(Column('version_datetime', DateTime, default=datetime.now, nullable=False, primary_key=True))

        if super_fks:
            cols.append(ForeignKeyConstraint(*zip(*super_fks)))

        table = Table(local_mapper.local_table.name + '_history', local_mapper.local_table.metadata,
           *cols
        )
    else:
        # single table inheritance.  take any additional columns that may have
        # been added and add them to the history table.
        for column in local_mapper.local_table.c:
            if column.key not in super_history_mapper.local_table.c:
                col = column.copy()
                col.unique = False
                super_history_mapper.local_table.append_column(col)
        table = None

    if super_history_mapper:
        bases = (super_history_mapper.class_,)
    else:
        bases = local_mapper.base_mapper.class_.__bases__
    versioned_cls = type.__new__(type, "%sHistory" % cls.__name__, bases, {})

    m = mapper(
            versioned_cls,
            table,
            inherits=super_history_mapper,
            polymorphic_on=polymorphic_on,
            polymorphic_identity=local_mapper.polymorphic_identity
            )
    cls.__history_mapper__ = m

    if not super_history_mapper:
        local_mapper.local_table.append_column(
            Column('version_datetime', DateTime, default=datetime.now, nullable=False, primary_key=False)
        )
        local_mapper.add_property("version_datetime", local_mapper.local_table.c.version_datetime)


class Versioned(object):
    @declared_attr
    def __mapper_cls__(cls):
        def map(cls, *arg, **kw):
            mp = mapper(cls, *arg, **kw)
            _history_mapper(mp)
            return mp
        return map


def versioned_objects(iter):
    for obj in iter:
        if hasattr(obj, '__history_mapper__'):
            yield obj

def create_version(obj, session, deleted = False):
    obj_mapper = object_mapper(obj)
    history_mapper = obj.__history_mapper__
    history_cls = history_mapper.class_

    obj_state = attributes.instance_state(obj)

    attr = {}

    obj_changed = False

    for om, hm in zip(obj_mapper.iterate_to_root(), history_mapper.iterate_to_root()):
        if hm.single:
            continue

        for hist_col in hm.local_table.c:
            if hist_col.key == 'version_datetime':
                continue

            obj_col = om.local_table.c[hist_col.key]

            # get the value of the
            # attribute based on the MapperProperty related to the
            # mapped column.  this will allow usage of MapperProperties
            # that have a different keyname than that of the mapped column.
            try:
                prop = obj_mapper.get_property_by_column(obj_col)
            except UnmappedColumnError:
                # in the case of single table inheritance, there may be
                # columns on the mapped table intended for the subclass only.
                # the "unmapped" status of the subclass column on the
                # base class is a feature of the declarative module as of sqla 0.5.2.
                continue

            # expired object attributes and also deferred cols might not be in the
            # dict.  force it to load no matter what by using getattr().
            if prop.key not in obj_state.dict:
                getattr(obj, prop.key)

            a, u, d = attributes.get_history(obj, prop.key)

            if d:
                attr[hist_col.key] = d[0]
                obj_changed = True
            elif u:
                attr[hist_col.key] = u[0]
            else:
                # if the attribute had no value.
                attr[hist_col.key] = a[0]
                obj_changed = True

    if not obj_changed:
        # not changed, but we have relationships.  OK
        # check those too
        for prop in obj_mapper.iterate_properties:
            if isinstance(prop, RelationshipProperty) and \
                attributes.get_history(obj, prop.key).has_changes():
                obj_changed = True
                break

    if not obj_changed and not deleted:
        return

    attr['version_datetime'] = obj.version_datetime
    hist = history_cls()
    for key, value in attr.items():
        setattr(hist, key, value)
    session.add(hist)
    print(dir(session))
    obj.version_datetime = datetime.now()

def versioned_session(session):
    @event.listens_for(session, 'before_flush')
    def before_flush(session, flush_context, instances):
        for obj in versioned_objects(session.dirty):
            create_version(obj, session)
        for obj in versioned_objects(session.deleted):
            create_version(obj, session, deleted = True)

更新: 好的,似乎在 before_flush() 方法中,我得到sqlalchemy.orm.session.Session的会话是我附加user_id到的会话的类型sqlalchemy.orm.scoping.scoped_session。因此,在某些时候,对象层被剥离。将 user_id 分配给 scoped_session 内的 Session 是否安全?我可以确定它不会用于其他请求吗?

4

2 回答 2

3

老问题,但仍然非常相关。

您应该避免尝试将 Web 会话信息放在数据库会话中。它结合了不相关的关注点,每个关注点都有自己的生命周期(不匹配)。这是我在带有 SQLAlchemy 的 Flask 中使用的一种方法(不是 Flask-SQLAlchemy,但它也应该有效)。我试图评论 Pyramid 的不同之处。

from flask import has_request_context  # How to check if in a Flask session
from sqlalchemy import inspect
from sqlalchemy.orm import class_mapper
from sqlalchemy.orm.attributes import get_history
from sqlalchemy.event import listen

from YOUR_SESSION_MANAGER import get_user  # This would be something in Pyramid
from my_project import models  # Where your models are defined

def get_object_changes(obj):
    """ Given a model instance, returns dict of pending
    changes waiting for database flush/commit.

    e.g. {
        'some_field': {
            'before': *SOME-VALUE*,
            'after': *SOME-VALUE*
        },
        ...
    }
    """
    inspection = inspect(obj)
    changes = {}
    for attr in class_mapper(obj.__class__).column_attrs:
        if getattr(inspection.attrs, attr.key).history.has_changes():
            if get_history(obj, attr.key)[2]:
                before = get_history(obj, attr.key)[2].pop()
                after = getattr(obj, attr.key)
                if before != after:
                    if before or after:
                        changes[attr.key] = {'before': before, 'after': after}
    return changes

def my_model_change_listener(mapper, connection, target):
    changes = get_object_changes(target)
    changes.pop("modify_ts", None)  # remove fields you don't want to track

    user_id = None
    if has_request_context():
        # Call your function to get active user and extract id
        user_id = getattr(get_user(), 'id', None)

    if user_id is None:
        # What do you want to do if user can't be determined
        pass

    # You now have the model instance (target), the user_id who is logged in,
    # and a dictionary of changes.

    # Either do somthing "quick" with it here or call an async task (e.g.
    # Celery) to do something with the information that may take longer
    # than you want the request to take.

# Add the listener
listen(models.MyModel, 'after_update', my_model_change_listener)
于 2019-10-01T17:24:12.650 回答
0

经过一堆摆弄后,我似乎能够通过执行以下操作在 scoped_session 内的会话对象上设置值:

DBSession = scoped_session(sessionmaker(extension=ZopeTransactionExtension()))
session = DBSession()
inner_session = session.registry()
inner_session.user_id = "test"
versioned_session(session)

现在session在 history_meta.py 中传递的对象有一个user_id我设置的属性。我有点担心这是否是正确的做法,因为注册表中的对象是一个线程本地对象,并且线程被重新用于不同的 http 请求。

于 2013-04-11T16:54:04.833 回答