很多时候,我写了如下查询:
pony.orm.select(u for u in User if u.available and u.friends > 0 and ...)
所以,我想写我自己的版本select
,它的替代品。就像避免我每次都写谓词的第一部分一样,if u.available and u.friends > 0
.
我的问题更笼统:我如何编写一个像select
它接受select
方法或count
方法可以接受的参数的函数。
很多时候,我写了如下查询:
pony.orm.select(u for u in User if u.available and u.friends > 0 and ...)
所以,我想写我自己的版本select
,它的替代品。就像避免我每次都写谓词的第一部分一样,if u.available and u.friends > 0
.
我的问题更笼统:我如何编写一个像select
它接受select
方法或count
方法可以接受的参数的函数。
User
让我们通过以下方式定义一个实体:
from datetime import datetime, timedelta
from pony.orm import *
db = Database('sqlite', ':memory:')
class User(db.Entity):
username = Required(str, unique=True)
password = Required(str)
friends = Set("User", reverse='friends') # many-to-many symmetric relation
online = Required(bool, default=False) # if user is currently online
last_visit = Optional(datetime) # last visit time
disabled = Required(bool, default=False) # if user is disabled by administrator
sql_debug(True)
db.generate_mapping(create_tables=True)
现在我们可以定义一些方便的函数来检索最常用的用户类型。第一个函数将返回未被管理员禁用的用户:
def active_users():
return User.select(lambda user: not user.disabled)
在该函数中,我使用接受 lambda 函数的实体select
方法,但可以使用接受生成器表达式的全局函数编写相同的函数:User
select
def active_users():
return select(u for u in User if not user.disabled)
函数的结果active_users
是一个查询对象。您可以调用filter
查询对象的方法来生成更具体的查询。例如,我可以使用active_users
函数来选择名称以“A”字母开头的活动用户:
users = active_users().filter(lambda user: user.name.startswith('A')) \
.order_by(User.name)[:10]
现在我想找到最近几天访问该站点的用户。我可以定义另一个函数,该函数使用从前一个函数返回的查询,并通过以下方式对其进行扩充:
def recent_users(days=1):
return active_users().filter(lambda u: u.last_visit > datetime.now() - timedelta(days))
在这个例子中,我将days
参数传递给函数并在过滤器中使用它的值。
您可以定义一组这样的函数,这些函数将形成应用程序的数据访问层。更多示例:
def users_with_at_least_n_friends(n=1):
return active_users().filter(lambda u: count(u.friends) >= n)
def online_users():
return User.select(lambda u: u.online)
def online_users_with_at_least_n_online_friends(n=1):
return online_users().filter(lambda u: count(f for f in u.friends if f.online) >= n)
users = online_users_with_at_least_n_online_friends(n=10) \
.order_by(User.name)[:10]
在上面的示例中,我定义了全局函数。另一种选择是将这些函数定义为User
实体的类方法:
class User(db.Entity):
username = Required(str, unique=True)
...
@classmethod
def name_starts_with(cls, prefix):
return cls.select(lambda user: not user.disabled
and user.name.startswith(prefix))
...
users = User.name_starts_with('A').order_by(desc(User.last_visit))[:10]
如果您可能想要一个可以应用于不同实体类的通用函数,那么您需要将实体类作为参数传递。例如,如果许多不同的类具有该deleted
属性,并且您希望有一个通用方法来仅选择未删除的对象,则可以编写如下内容:
def select_active(cls):
return cls.select(lambda obj: not obj.deleted)
select_active(Message).filter(lambda msg: msg.author == current_user)
上面提供的所有函数都有一个缺点——它们是不可组合的。您无法从一个函数获取查询并使用另一个函数对其进行扩充。如果您想要一个可以扩充现有查询的函数,该函数应该接受该查询作为参数。例子:
def active_users():
return User.select(lambda user: not user.disabled)
def name_starts_with(query, prefix):
return query.filter(lambda user: user.name.startswith('prefix'))
该name_starts_with
函数可以应用于另一个查询:
users1 = name_starts_with(active_users(), 'A').order_by(User.last_visited)
users2 = name_starts_with(recent_users(), 'B').filter(lambda user: user.online)
此外,我们正在开发查询扩展 API,这将允许程序员编写自定义查询方法。当我们发布此 API 时,可以通过以下方式将自定义查询方法链接在一起:
select(u for u in User).recent(days=3).name_starts_with('A')[:10]
希望我回答了你的问题。如果是这种情况,请将答案视为正确答案。