我有一个 python 脚本,它执行以下操作:i。它采用数据的输入文件(通常是嵌套的 JSON 格式) ii。将数据逐行传递给另一个函数,该函数将数据处理为所需的格式 iii. 最后将输出写入文件。
这是我当前执行此操作的简单 python 行...
def manipulate(line):
# a pure python function which transforms the data
# ...
return manipulated_json
for line in f:
components.append(manipulate(ujson.loads(line)))
write_to_csv(components)`
这是可行的,但是由于 python GIL 将其限制为服务器上的一个核心,它的速度非常慢,尤其是在处理大量数据的情况下。
我通常处理的数据量约为 4 gigs gzip 压缩,但有时我必须处理数百 gigs gzip 压缩的数据。它不一定是大数据,但仍然不能全部在内存中处理,并且使用 Python 的 GIL 处理速度非常慢。
在寻找优化数据处理的解决方案时,我遇到了 dask。虽然 PySpark 在当时对我来说似乎是显而易见的解决方案,但 dask 的承诺和它的简单性赢得了我的青睐,我决定试一试。
在对 dask 以及如何使用它进行了大量研究之后,我编写了一个非常小的脚本来复制我当前的过程。脚本如下所示:
import dask.bag as bag
import json
bag.from_filenames('input.json.gz').map(json.loads).map(lambda x:manipulate(x)).concat().to_dataframe().to_csv('output.csv.gz')`
这可以工作并产生与我原来的非 dask 脚本相同的结果,但它仍然只使用服务器上的一个 CPU。所以,它根本没有帮助。事实上,它更慢。
我究竟做错了什么?我错过了什么吗?我对 dask 还是很陌生,所以让我知道我是否忽略了某些事情,或者我是否应该做一些完全不同的事情。
此外,是否有任何替代 dask 可以使用服务器的全部容量(即所有 CPU)来完成我需要做的事情?
谢谢,
吨