1

编辑了我的代码 - 现在它可以工作了我正在尝试通过 asyncpg 连接池从我的 Postgres 数据库中异步获取一些日期。基本上我的数据库包含大约 100 个不同的表(每个城市),我试图尽可能快地在一帧中收集所有数据。

    import pandas as pd
    import asyncpg
    import asyncio
    from time import time


    def make_t():
        lst = []
        # iterator for sql tuple
        for i in ['a',
                  'b',
                  'c']:
            i1 = i
            sql = """
    SELECT
    '%s' as city,
    MAX(starttime) AS max_ts
    FROM
    "table_%s"
    """
            lst.append(sql % (i, i1))
        return tuple(lst)


    async def get_data(pool, sql):
        start = time()
        async with pool.acquire() as conn:
           stmt = await conn.prepare(sql)
           columns = [a.name for a in stmt.get_attributes()]
           data = await stmt.fetch()
           print(f'Exec time: {time() - start}')
           return pd.DataFrame(data, columns=columns)


    async def main():
        dsn = 'postgres://user:pass@127.0.0.1:5432/my_base'
        cT = ['city', 'max_ts']
        sqls = make_t()
        pool = await asyncpg.create_pool(dsn=dsn, max_size=50)
        start = time()
        tasks = []

        for sql in sqls:
            tasks.append(loop.create_task(get_data(pool, sql)))

        tasks = await asyncio.gather(*tasks)
        df = pd.DataFrame(columns=cT)
        for task in tasks:
            # form df from corutine results
            df = df.append(task.result())

        print(f'total exec time: {time() - start} secs')
        print('exiting main')
        return df


    loop = asyncio.get_event_loop()
    df = loop.run_until_complete(main())
    loop.close()

    print('exiting program')

Python 3.6.5 :: Anaconda, Inc.

给我这个错误:

Traceback(最近一次调用最后一次):文件“”,第 319 行,在文件“/Users/fixx/anaconda3/lib/python3.6/asyncio/base_events.py”中,第 468 行,在 run_until_complete 返回 future.result() 文件中“”,第 308 行,在主文件中“/Users/fixx/anaconda3/lib/python3.6/asyncio/tasks.py”,第 594 行,在 set(coros_or_futures) 中收集 arg:TypeError: unhashable type: 'list '

我想不通,为什么?我的 sqls 在元组中!

4

1 回答 1

1

asyncio.gather接受协程作为单独的参数,并且您正在向它发送任务列表。您必须使用*运算符才能正确调用gather

        tasks = await asyncio.gather(*tasks)
于 2019-02-09T00:05:30.913 回答