编辑了我的代码 - 现在它可以工作了我正在尝试通过 asyncpg 连接池从我的 Postgres 数据库中异步获取一些日期。基本上我的数据库包含大约 100 个不同的表(每个城市),我试图尽可能快地在一帧中收集所有数据。
import pandas as pd
import asyncpg
import asyncio
from time import time
def make_t():
lst = []
# iterator for sql tuple
for i in ['a',
'b',
'c']:
i1 = i
sql = """
SELECT
'%s' as city,
MAX(starttime) AS max_ts
FROM
"table_%s"
"""
lst.append(sql % (i, i1))
return tuple(lst)
async def get_data(pool, sql):
start = time()
async with pool.acquire() as conn:
stmt = await conn.prepare(sql)
columns = [a.name for a in stmt.get_attributes()]
data = await stmt.fetch()
print(f'Exec time: {time() - start}')
return pd.DataFrame(data, columns=columns)
async def main():
dsn = 'postgres://user:pass@127.0.0.1:5432/my_base'
cT = ['city', 'max_ts']
sqls = make_t()
pool = await asyncpg.create_pool(dsn=dsn, max_size=50)
start = time()
tasks = []
for sql in sqls:
tasks.append(loop.create_task(get_data(pool, sql)))
tasks = await asyncio.gather(*tasks)
df = pd.DataFrame(columns=cT)
for task in tasks:
# form df from corutine results
df = df.append(task.result())
print(f'total exec time: {time() - start} secs')
print('exiting main')
return df
loop = asyncio.get_event_loop()
df = loop.run_until_complete(main())
loop.close()
print('exiting program')
Python 3.6.5 :: Anaconda, Inc.
给我这个错误:
Traceback(最近一次调用最后一次):文件“”,第 319 行,在文件“/Users/fixx/anaconda3/lib/python3.6/asyncio/base_events.py”中,第 468 行,在 run_until_complete 返回 future.result() 文件中“”,第 308 行,在主文件中“/Users/fixx/anaconda3/lib/python3.6/asyncio/tasks.py”,第 594 行,在 set(coros_or_futures) 中收集 arg:TypeError: unhashable type: 'list '
我想不通,为什么?我的 sqls 在元组中!