它的工作原理是这样的:首先您创建一个汇总每个客户销售数据的子查询,然后对加入此子查询的卖家执行汇总查询。
customers = (session
.query(Customer.seller_id, func.count().label('bought_count'))
.join(ProductBought)
.group_by(Customer)
.subquery())
result = (session
.query(Seller, func.sum(customers.c.bought_count).label('total_sales'))
.join(customers, Seller.id==customers.c.seller_id)
.group_by(Seller)
.order_by(desc('total_sales'))
.first())
包含模型定义和示例数据初始化的完整代码:
from sqlalchemy import (create_engine, event,
Column, Integer, String,
ForeignKey,
desc, func)
from sqlalchemy.orm import relationship, sessionmaker
from sqlalchemy.ext.declarative import declarative_base, declared_attr
from random import choice
engine = create_engine('sqlite:///sellers.db', echo=True)
event.listen(engine, 'connect',
lambda conn, rec: conn.execute('PRAGMA foreign_keys=ON;'))
Session = sessionmaker(bind=engine)
class Base(object):
@declared_attr
def __tablename__(cls):
return cls.__name__.lower()
id = Column(Integer, primary_key=True, autoincrement=True)
def __repr__(self):
clsname = self.__class__.__name__
return '<%s(%d)>' % (clsname, self.id)
Base = declarative_base(cls=Base)
class Seller(Base):
pass
class Customer(Base):
seller_id = Column(Integer, ForeignKey(Seller.id), nullable=False)
seller = relationship(Seller, backref='customers')
class Product(Base):
pass
class ProductBought(Base):
product_id = Column(Integer, ForeignKey(Product.id), nullable=False)
product = relationship(Product, backref='sales')
customer_id = Column(Integer, ForeignKey(Customer.id), nullable=False)
customer = relationship(Customer, backref='purchases')
def prepare_data():
Base.metadata.create_all(engine)
session = Session()
sellers = [Seller() for _ in xrange(5)]
customers = [Customer(seller=choice(sellers)) for _ in xrange(20)]
products = [Product() for _ in xrange(10)]
sales = [ProductBought(product=choice(products), customer=choice(customers))
for _ in xrange(100)]
session.add_all(sellers + customers + products + sales)
session.commit()
session.close()
def top_seller():
session = Session()
customers = (session
.query(Customer.seller_id, func.count().label('bought_count'))
.join(ProductBought)
.group_by(Customer)
.subquery())
result = (session
.query(Seller, func.sum(customers.c.bought_count).label('total_sales'))
.join(customers, Seller.id==customers.c.seller_id)
.group_by(Seller)
.order_by(desc('total_sales'))
.first())
session.close()
return result
def main():
Base.metadata.drop_all(engine)
prepare_data()
print top_seller()
if __name__ == "__main__":
main()