有没有办法为查询对象创建自定义方法,这样你就可以做这样的事情?
User.query.all_active()
all_active()
本质上在哪里.filter(User.is_active == True)
并且能够过滤掉它?
User.query.all_active().filter(User.age == 30)
有没有办法为查询对象创建自定义方法,这样你就可以做这样的事情?
User.query.all_active()
all_active()
本质上在哪里.filter(User.is_active == True)
并且能够过滤掉它?
User.query.all_active().filter(User.age == 30)
您可以对基Query
类进行子类化以添加自己的方法:
from sqlalchemy.orm import Query
class MyQuery(Query):
def all_active(self):
return self.filter(User.is_active == True)
然后,您在创建会话时告诉 SQLAlchemy 使用这个新的查询类(此处的文档)。从您的代码看来,您可能正在使用 Flask-SQLAlchemy,因此您可以按如下方式进行:
db = SQLAlchemy(session_options={'query_cls': MyQuery})
否则,您会将参数直接传递给sessionmaker
:
sessionmaker(bind=engine, query_cls=MyQuery)
到目前为止,这个新的查询对象并不那么有趣,因为我们User
在方法中对类进行了硬编码,所以它不适用于其他任何东西。更好的实现将使用查询的底层类来确定应用哪个过滤器。这有点棘手,但也可以做到:
class MyOtherQuery(Query):
def _get_models(self):
"""Returns the query's underlying model classes."""
if hasattr(query, 'attr'):
# we are dealing with a subquery
return [query.attr.target_mapper]
else:
return [
d['expr'].class_
for d in query.column_descriptions
if isinstance(d['expr'], Mapper)
]
def all_active(self):
model_class = self._get_models()[0]
return self.filter(model_class.is_active == True)
最后,动态关系(如果有的话)不会使用这个新的查询类。为了让那些也使用它,您可以在创建关系时将其作为参数传递:
users = relationship(..., query_class=MyOtherQuery)
这对我来说很好
class ParentQuery(Query):
def _get_models(self):
if hasattr(query, 'attr'):
return [query.attr.target_mapper]
else:
return self._mapper_zero().class_
def FilterByCustomer(self):
model_class = self._get_models()
return self.filter(model_class.customerId == int(g.customer.get('customerId')))
class AccountWorkflowModel(db.Model):
query_class = ParentQuery
.................
要提供一个自定义方法,该方法将被所有从特定父级继承的模型使用,首先如前所述从 Query 类继承:
from flask_sqlalchemy import SQLAlchemy, BaseQuery
from sqlalchemy.inspection import inspect
class MyCustomQuery(BaseQuery):
def all_active(self):
# get the class
modelClass = self._mapper_zero().class_
# get the primary key column
ins = inspect(modelClass)
# get a list of passing objects
passingObjs = []
for modelObj in self:
if modelObj.is_active == True:
# add to passing object list
passingObjs.append(modelObj.__dict__[ins.primary_key[0].name])
# change to tuple
passingObjs = tuple(passingObjs)
# run a filter on the query object
return self.filter(ins.primary_key[0].in_(passingObjs))
# add this to the constructor for your DB object
myDB = SQLAlchemy(query_class=MyCustomQuery)
这是针对flask-sqlalchemy的,人们在寻找这个答案时仍然会来到这里。