60

如何SQLAlchemy成为?Tornado_ async我在async mongo 示例中找到了 MongoDB 的示例,但我找不到类似motorfor 的东西SQLAlchemy。有谁知道如何进行SQLAlchemy查询以执行tornado.gen(我在MySQL下面使用SQLAlchemy,目前我的处理程序从数据库读取并返回结果,我想让这个异步)。

4

6 回答 6

87

ORM 不太适合显式异步编程,即程序员必须在任何使用网络访问的事情发生时产生显式回调。一个主要原因是 ORM 广泛使用延迟加载模式,这或多或少与显式异步不兼容。如下所示的代码:

user = Session.query(User).first()
print user.addresses

实际上会发出两个单独的查询 - 当您说first()要加载一行时,下一个是当您说 时user.addresses,如果.addresses集合尚不存在或已过期。从本质上讲,几乎每一行处理 ORM 构造的代码都可能阻塞 IO,因此您将在几秒钟内陷入大量回调意大利面条 - 更糟糕的是,绝大多数这些代码行实际上不会阻塞 IO,因此,将回调连接在一起以进行简单的属性访问操作的所有开销也会使您的程序效率大大降低。

显式异步模型的一个主要问题是它们给复杂的系统增加了巨大的 Python 函数调用开销——不仅仅是像延迟加载那样在面向用户的方面,而且在内部方面以及系统如何围绕Python 数据库 API (DBAPI)。对于 SQLAlchemy 来说,即使有基本的异步支持,也会对绝大多数不使用异步模式的程序,甚至是那些不高度并发的异步程序造成严重的性能损失。考虑 SQLAlchemy,或任何其他 ORM 或抽象层,可能具有如下代码:

def execute(connection, statement):
     cursor = connection.cursor()
     cursor.execute(statement)
     results = cursor.fetchall()
     cursor.close()
     return results

上面的代码执行看似简单的操作,在连接上执行 SQL 语句。但是使用像 psycopg2 的异步扩展这样的完全异步 DBAPI,上述代码在 IO 上至少阻塞了 3 次。因此,以显式异步风格编写上述代码,即使没有使用异步引擎并且回调实际上并未阻塞,这意味着上述外部函数调用至少成为三个函数调用,而不是一个,不包括施加的开销由显式异步系统或 DBAPI 自己调用。因此,一个简单的应用程序会自动受到 3 倍于围绕语句执行的简单抽象的函数调用开销的惩罚。而在 Python 中,函数调用开销就是一切

由于这些原因,我仍然对围绕显式异步系统的炒作感到不那么兴奋,至少在某种程度上,有些人似乎想要对所有事情都进行异步处理,比如交付网页(参见 node.js)。我建议改用隐式异步系统,尤其是gevent,您可以获得异步模型的所有非阻塞 IO 优势,并且没有显式回调的结构冗长/缺点。我继续尝试理解这两种方法的用例,所以我对显式异步方法作为所有问题的解决方案的吸引力感到困惑,即正如您在 node.js 中看到的那样 - 我们在减少冗长和代码复杂性的首要位置,对于简单的事情(如交付网页)的显式异步似乎什么都不做,只是添加样板文件,如果阻塞 IO 甚至是这样的问题这样的情况(很多大容量网站都可以使用同步 IO 模型)。基于 Gevent 的系统已经过生产验证,并且越来越受欢迎,所以如果你喜欢 ORM 提供的代码自动化,

更新:Nick Coghlan 指出了他关于显式与隐式异步主题的精彩文章,这也是必须阅读的。而且我也更新了pep-3156 现在欢迎与 gevent 的互操作性这一事实,扭转了其先前声明的对 gevent 的不感兴趣,这在很大程度上要归功于 Nick 的文章。所以在未来,一旦集成这些方法的系统可用,我会推荐使用 gevent 的 Tornado 混合数据库逻辑。

于 2013-05-12T00:44:41.280 回答
34

我过去也遇到过同样的问题,但我找不到可靠的 Async-MySQL 库。然而,使用Asyncio + Postgres有一个很酷的解决方案。你只需要使用aiopg库,它附带了开箱即用的 SQLAlchemy 支持:

import asyncio
from aiopg.sa import create_engine
import sqlalchemy as sa

metadata = sa.MetaData()

tbl = sa.Table('tbl', metadata,
    sa.Column('id', sa.Integer, primary_key=True),
    sa.Column('val', sa.String(255)))

async def create_table(engine):
    async with engine.acquire() as conn:
        await conn.execute('DROP TABLE IF EXISTS tbl')
        await conn.execute('''CREATE TABLE tbl (
                                  id serial PRIMARY KEY,
                                  val varchar(255))''')

async def go():
    async with create_engine(user='aiopg',
                             database='aiopg',
                             host='127.0.0.1',
                             password='passwd') as engine:

        async with engine.acquire() as conn:
            await conn.execute(tbl.insert().values(val='abc'))

            async for row in conn.execute(tbl.select()):
                print(row.id, row.val)

loop = asyncio.get_event_loop()
loop.run_until_complete(go())

如@cglacet 所述更新

于 2014-11-14T08:06:20.247 回答
11

不是龙卷风,而是我们在GINO 项目的asyncio中使 SQLAlchemy 异步:

import asyncio
from gino import Gino, enable_task_local
from sqlalchemy import Column, Integer, Unicode, cast

db = Gino()


class User(db.Model):
    __tablename__ = 'users'

    id = Column(Integer(), primary_key=True)
    nickname = Column(Unicode(), default='noname')


async def main():
    await db.create_pool('postgresql://localhost/gino')

    # Create object, `id` is assigned by database
    u1 = await User.create(nickname='fantix')
    print(u1.id, u1.nickname)  # 1 fantix

    # Retrieve the same row, as a different object
    u2 = await User.get(u1.id)
    print(u2.nickname)  # fantix

    # Update affects only database row and the operating object
    await u2.update(nickname='daisy')
    print(u2.nickname)  # daisy
    print(u1.nickname)  # fantix

    # Returns all user objects with "d" in their nicknames
    users = await User.query.where(User.nickname.contains('d')).gino.all()

    # Find one user object, None if not found
    user = await User.query.where(User.nickname == 'daisy').gino.first()

    # Execute complex statement and return command status
    status = await User.update.values(
        nickname='No.' + cast(User.id, Unicode),
    ).where(
        User.id > 10,
    ).gino.status()

    # Iterate over the results of a large query in a transaction as required
    async with db.transaction():
        async for u in User.query.order_by(User.id).gino.iterate():
            print(u.id, u.nickname)


loop = asyncio.get_event_loop()
enable_task_local(loop)
loop.run_until_complete(main())

它看起来有点像,但实际上与 SQLAlchemy ORM完全不同。因为我们只使用了 SQLAlchemy 核心的一部分,并在其上构建了一个简单的 ORM。它在下面使用asyncpg,因此它仅适用于 PostgreSQL

更新:感谢 Vladimir Goncharov 的贡献,GINO 现在支持 Tornado。在此处查看文档

于 2017-08-12T04:13:52.193 回答
4

我在下一个方式中使用带有 sqlalchemy 的龙卷风:


from tornado_mysql import pools
from sqlalchemy.sql import table, column, select, join
from sqlalchemy.dialects import postgresql, mysql

# from models import M, M2

t = table(...)
t2 = table(...)

xxx_id = 10

j = join(t, t2, t.c.t_id == t2.c.id)
s = select([t]).select_from(j).where(t.c.xxx == xxx_id)

sql_str = s.compile(dialect=mysql.dialect(),compile_kwargs={"literal_binds": True})


pool = pools.Pool(conn_data...)
cur = yield pool.execute(sql_str)
data = cur.fetchone()

在这种情况下,我们可以使用 sqlalchemy 模型和 sqlalchemy 工具来构建查询。

于 2016-05-18T14:32:13.127 回答
1

SQLAlchemy 1.4 原生支持 asyncio(目前处于测试阶段):

https://docs.sqlalchemy.org/en/14/orm/extensions/asyncio.html

于 2021-02-17T20:07:02.113 回答
0

我在下一个方式中使用 tornado6 和 sqlalchemy:

from tornado.ioloop import IOLoop

def sql_function():
    pass

class Handler(tornado.web.RequestHandler):
    async def post(self):
        args = get_front_end_args()
        result = await IOLoop.current().run_in_executor(None,sql_function,*(args))
        self.write({"result":result})
于 2020-05-22T09:51:59.643 回答