0

我正在尝试使用 sklearn MiniBatchKMeans 来聚类一个相当大的数据集(150k 样本和 150k 特征)。我认为我可以使用来自 dask_ml 的增量来使事情变得更快,以将我的数据分成块。这是我在虚拟数据集上的代码片段:

    from dask_ml.datasets import make_blobs
    from dask_ml.wrappers import Incremental
    from sklearn.cluster import MiniBatchKMeans
    import dask.array as da
    import dask

    dataset = da.random.random((150000, 150000), chunks = (1000, 1000))
    kmeans = MiniBatchKMeans(n_clusters = 3)
    inc = Incremental(kmeans).fit(dataset)
    predicted_labels = inc.predict(dataset).compute()
    print(predicted_labels) 

该进程在 compute() 步骤中被终止。我不认为在 150k 点上运行 compute() 会如此密集。它因这个奇怪的错误而失败:

ValueError: X has 150000 features, but MiniBatchKMeans is expecting 1000 
features as input.

我不明白 MiniBatchKMeans 功能的大小与标签上的 compute() 有什么关系

编辑 在第一个答案之后,我想澄清一下我在标签(而不是数据集!)上使用了 compute(),因为我需要它们来进行一些绘图操作。这些值需要在 RAM 上才能被 matplotlib 函数使用。

(150k, ) 的数组应该能够舒适地安装在 RAM 上,我不确定它为什么会失败!

4

1 回答 1

0

该过程失败,因为该compute()方法会将整个数据集带到本地 RAM。请参阅文档

.compute()您可以通过调用方法或dask.compute(...)函数将任何 dask 集合转换为具体值。

并进一步:

但是,如果您尝试将整个数据集带回本地 RAM,这种方法通常会失败。

>>> df.compute() # MemoryError(...)

这也是为什么您会收到大小不匹配的值错误的原因,因为您将一次将整个数据集传递给该predict()方法,而不是一个接一个地传递较小的块。删除该compute()语句,它将正常工作:

predicted_labels = inc.predict(dataset)

由于您的目标似乎是“让事情变得更快”,请注意以下几点:

Dask Array 的每个块都被馈送到底层估计器的partial_fit方法。训练是完全按顺序进行的,因此您不会注意到并行性带来的大量训练时间加速。在分布式环境中,您应该注意到避免额外 IO 带来的一些加速,以及模型通常比数据小得多的事实,因此在机器之间移动的速度更快。

(从这里

于 2021-06-14T10:25:19.353 回答