8

我有一个 json 文件目录,我试图将其转换为 dask DataFrame 并将其保存到 castra。它们之间有 200 个包含 O(10**7) json 记录的文件。代码非常简单,主要遵循教程示例。

import dask.dataframe as dd
import dask.bag as db
import json
txt = db.from_filenames('part-*.json')
js = txt.map(json.loads)
df = js.to_dataframe()
cs=df.to_castra("data.castra")

我在 32 核机器上运行它,但代码仅 100% 使用一个核。我对文档的理解是这段代码是并行执行的。为什么不是?我是不是误会了什么?

4

1 回答 1

8

您的最终集合是一个 dask 数据帧,默认情况下使用线程,您必须明确告诉 dask 使用进程。

您可以在全球范围内执行此操作

import dask
dask.config.set(scheduler='multiprocessing')

或者只是在to_castra通话中执行此操作

df.to_castra("data.castra", scheduler='multiprocessing')

此外,作为一个警告,卡斯特拉主要是一个实验。它速度相当快,但也不像 HDF5 或 Parquet 那样成熟。

于 2016-02-19T23:04:10.023 回答