2

我希望我的 Dask 工作人员从 a 中获取 Postgres 连接ThreadedConnectionPool,但是当像这样通过池时

from psycopg2.pool import ThreadedConnectionPool

def worker_pg(n, pool) -> None:
    print(n)

work = db.from_sequence(range(4))
tcp = ThreadedConnectionPool(1, 800, "db_string")

work.map(worker_pg, pool=tcp).compute()

我收到序列化错误,例如:

TypeError: ('Could not serialize object of type ThreadedConnectionPool.', '<psycopg2.pool.ThreadedConnectionPool object at 0x7f99dc57b128>')

此外,虽然我一直在尝试这个,psycopg2但我也很喜欢这个工作asyncpg(性能原因)。但是,这增加了使用awaitasyncfromasyncio

import asyncio
import asyncpg

async def get_pool():
    p = await asyncpg.create_pool("db_string")
    return p

pool = asyncio.get_event_loop().run_until_complete(get_pool())

work.map(worker_pg, pool=pool).compute()

尽管我似乎最终会遇到相同类型的错误,例如

TypeError: ('Could not serialize object of type Pool.', '<asyncpg.pool.Pool object at 0x7fdee9127818>')

非常感谢任何建议(或替代方案?)!

4

2 回答 2

1

正如评论中所建议的,您可能会考虑让您的每个任务都打开与 Postgres 的连接,执行查询,然后关闭该连接。

不幸的是,Dask 无法在机器之间移动活动的数据库连接。这些对象与它们启动的过程密切相关。

于 2020-03-28T00:31:35.860 回答
-1

您可以在工作之上编写一个简单的包装器,以使其使用自己的数据库连接。这是一个简单的示例 - 可能可以根据您的需要进一步优化。

def my_task(conn, more_args):
    """Use psycopg2 conn to run a task"""
    
    # Something complicated here
    pass

def run_my_task(more_args):
    """Wraps my_task and gives it its own conn"""
    with psycopg2.connect(...) as conn:
        my_task(conn, more_args)

run_my_task = dask.delayed(run_my_task)
jobs = []
for i in range(10):
    jobs.append(run_my_task(i))
dask.compute(*jobs)
于 2020-07-16T14:20:28.817 回答