6

我有一个 python 脚本,它执行以下操作:i。它采用数据的输入文件(通常是嵌套的 JSON 格式) ii。将数据逐行传递给另一个函数,该函数将数据处理为所需的格式 iii. 最后将输出写入文件。

这是我当前执行此操作的简单 python 行...

def manipulate(line):
    # a pure python function which transforms the data
    # ...
    return manipulated_json

for line in f:
    components.append(manipulate(ujson.loads(line)))
    write_to_csv(components)`

这是可行的,但是由于 python GIL 将其限制为服务器上的一个核心,它的速度非常慢,尤其是在处理大量数据的情况下。

我通常处理的数据量约为 4 gigs gzip 压缩,但有时我必须处理数百 gigs gzip 压缩的数据。它不一定是大数据,但仍然不能全部在内存中处理,并且使用 Python 的 GIL 处理速度非常慢。

在寻找优化数据处理的解决方案时,我遇到了 dask。虽然 PySpark 在当时对我来说似乎是显而易见的解决方案,但 dask 的承诺和它的简单性赢得了我的青睐,我决定试一试。

在对 dask 以及如何使用它进行了大量研究之后,我编写了一个非常小的脚本来复制我当前的过程。脚本如下所示:

import dask.bag as bag
import json
bag.from_filenames('input.json.gz').map(json.loads).map(lambda x:manipulate(x)).concat().to_dataframe().to_csv('output.csv.gz')`

这可以工作并产生与我原来的非 dask 脚本相同的结果,但它仍然只使用服务器上的一个 CPU。所以,它根本没有帮助。事实上,它更慢。

我究竟做错了什么?我错过了什么吗?我对 dask 还是很陌生,所以让我知道我是否忽略了某些事情,或者我是否应该做一些完全不同的事情。

此外,是否有任何替代 dask 可以使用服务器的全部容量(即所有 CPU)来完成我需要做的事情?

谢谢,

4

3 回答 3

2

这里的问题是 with dask.dataframe.to_csv,它迫使你进入单核模式。

我建议使用dask.bag来进行阅读和操作,然后并行转储到一堆 CSV 文件。转储到多个 CSV 文件比转储到单个 CSV 文件更容易协调。

import dask.bag as bag
import json
b = bag.from_filenames('input.json.gz').map(json.loads).map(manipulate).concat()
b.map(lambda t: ','.join(map(str, t)).to_textfiles('out.*.csv').compute()

尝试并行读取单个 GZIP 文件也可能存在问题,但以上内容应该可以帮助您入门。

于 2015-12-03T21:14:16.783 回答
0

如果您向MyFiles-*.csvdask提供基于 glob 的文件名,dataframe.to_csv()您应该能够将数据帧输出到磁盘。它将创建多个文件而不是 1 个大型 csv 文件。请参阅此线程以获取更多信息 https://groups.google.com/a/continuum.io/forum/#!searchin/blaze-dev/to_csv/blaze-dev/NCQfCoOWEcI/S7fwuCfeCgAJ

MyFiles-0001.csv  
MyFiles-0002.csv 
....
于 2016-08-26T18:36:11.830 回答
0

似乎袋子的平行度取决于它们所拥有的分区数量。

对我来说,跑步

mybag=bag.from_filenames(filename, chunkbytes=1e7)
mybag.npartitions

产量

1746

这解决了问题并使处理完全可并行化。

于 2016-01-05T12:25:31.940 回答