35

有没有办法为查询对象创建自定义方法,这样你就可以做这样的事情?

User.query.all_active()

all_active()本质上在哪里.filter(User.is_active == True)

并且能够过滤掉它?

User.query.all_active().filter(User.age == 30)
4

3 回答 3

60

您可以对基Query类进行子类化以添加自己的方法:

from sqlalchemy.orm import Query

class MyQuery(Query):

  def all_active(self):
    return self.filter(User.is_active == True)

然后,您在创建会话时告诉 SQLAlchemy 使用这个新的查询类(此处的文档)。从您的代码看来,您可能正在使用 Flask-SQLAlchemy,因此您可以按如下方式进行:

db = SQLAlchemy(session_options={'query_cls': MyQuery})

否则,您会将参数直接传递给sessionmaker

sessionmaker(bind=engine, query_cls=MyQuery)

到目前为止,这个新的查询对象并不那么有趣,因为我们User在方法中对类进行了硬编码,所以它不适用于其他任何东西。更好的实现将使用查询的底层类来确定应用哪个过滤器。这有点棘手,但也可以做到:

class MyOtherQuery(Query):

  def _get_models(self):
    """Returns the query's underlying model classes."""
    if hasattr(query, 'attr'):
      # we are dealing with a subquery
      return [query.attr.target_mapper]
    else:
      return [
        d['expr'].class_
        for d in query.column_descriptions
        if isinstance(d['expr'], Mapper)
      ]

  def all_active(self):
    model_class = self._get_models()[0]
    return self.filter(model_class.is_active == True)

最后,动态关系(如果有的话)不会使用这个新的查询类。为了让那些也使用它,您可以在创建关系时将其作为参数传递:

users = relationship(..., query_class=MyOtherQuery)
于 2013-04-12T02:22:59.033 回答
2

这对我来说很好

class ParentQuery(Query):
    def _get_models(self):     
        if hasattr(query, 'attr'):
            return [query.attr.target_mapper]
        else:
            return self._mapper_zero().class_

    def FilterByCustomer(self):
        model_class = self._get_models()
        return self.filter(model_class.customerId == int(g.customer.get('customerId')))


class AccountWorkflowModel(db.Model):
    query_class = ParentQuery
    .................
于 2020-04-18T07:40:22.143 回答
1

要提供一个自定义方法,该方法将被所有从特定父级继承的模型使用,首先如前所述从 Query 类继承:

from flask_sqlalchemy import SQLAlchemy, BaseQuery
from sqlalchemy.inspection import inspect

class MyCustomQuery(BaseQuery):
    def all_active(self):
        # get the class
        modelClass = self._mapper_zero().class_
        # get the primary key column
        ins = inspect(modelClass)
        # get a list of passing objects
        passingObjs = []
        for modelObj in self:
            if modelObj.is_active == True:
                # add to passing object list
                passingObjs.append(modelObj.__dict__[ins.primary_key[0].name])
        # change to tuple
        passingObjs = tuple(passingObjs)
        # run a filter on the query object
        return self.filter(ins.primary_key[0].in_(passingObjs))

# add this to the constructor for your DB object
myDB = SQLAlchemy(query_class=MyCustomQuery)

这是针对flask-sqlalchemy的,人们在寻找这个答案时仍然会来到这里。

于 2019-09-23T07:47:22.140 回答