我希望我的 Dask 工作人员从 a 中获取 Postgres 连接ThreadedConnectionPool
,但是当像这样通过池时
from psycopg2.pool import ThreadedConnectionPool
def worker_pg(n, pool) -> None:
print(n)
work = db.from_sequence(range(4))
tcp = ThreadedConnectionPool(1, 800, "db_string")
work.map(worker_pg, pool=tcp).compute()
我收到序列化错误,例如:
TypeError: ('Could not serialize object of type ThreadedConnectionPool.', '<psycopg2.pool.ThreadedConnectionPool object at 0x7f99dc57b128>')
此外,虽然我一直在尝试这个,psycopg2
但我也很喜欢这个工作asyncpg
(性能原因)。但是,这增加了使用await
和async
fromasyncio
import asyncio
import asyncpg
async def get_pool():
p = await asyncpg.create_pool("db_string")
return p
pool = asyncio.get_event_loop().run_until_complete(get_pool())
work.map(worker_pg, pool=pool).compute()
尽管我似乎最终会遇到相同类型的错误,例如
TypeError: ('Could not serialize object of type Pool.', '<asyncpg.pool.Pool object at 0x7fdee9127818>')
非常感谢任何建议(或替代方案?)!