-1

我正在使用这个读取数据: ddf1 = dd.read_sql_table('mytable', conn_string, index_col='id', npartitions=8)

当然,由于惰性计算,这会立即运行。这个表有几亿行。

接下来,我要过滤这个 Dask 数据框:

ddf2 = ddf1.query('some_col == "converted"')

最后,我想将其转换为 Pandas 数据框。结果应该只有大约 8000 行:

ddf3 = ddf2.compute()

但是,这需要很长时间(约 1 小时)。我可以就如何大幅加快速度获得任何建议吗?我试过使用.compute(scheduler='threads'),改变分区的数量,但到目前为止没有一个工作。我究竟做错了什么?

4

1 回答 1

1

首先,您可以使用 sqlalchemy 表达式语法在查询中对您的过滤子句进行编码,并在服务器端进行过滤。如果数据传输是您的瓶颈,那是您最好的解决方案,尤其是过滤列被索引。

根据您的数据库后端,sqlalchemy 可能不会释放 GIL,因此您的分区不能在线程中并行运行。你得到的只是线程之间的争用和额外的开销。您应该将分布式调度程序与进程一起使用。

当然,请看你的CPU和内存使用情况;使用分布式调度程序,您还可以访问诊断仪表板。您还应该关注每个分区在内存中的大小。

于 2020-03-16T20:48:03.467 回答