8

当我尝试将 df2 提供给 kmeans 时,出现以下错误

clusters = KMeans.train(df2, 10, maxIterations=30,
                        runs=10, initializationMode="random")

我得到的错误:

Cannot convert type <class 'pyspark.sql.types.Row'> into Vector

df2 是如下创建的数据框:

df = sqlContext.read.json("data/ALS3.json")
df2 = df.select('latitude','longitude')

df2.show()


     latitude|       longitude|

   60.1643075|      24.9460844|
   60.4686748|      22.2774728|

如何将这两列转换为 Vector 并将其提供给 KMeans?

4

2 回答 2

13

机器学习

问题是您错过了文档的示例,很明显该方法train需要 aDataFrame和 aVector作为特性。

要修改当前数据的结构,您可以使用VectorAssembler。在你的情况下,它可能是这样的:

from pyspark.sql.functions import *

vectorAssembler = VectorAssembler(inputCols=["latitude", "longitude"],
                                  outputCol="features")

# For your special case that has string instead of doubles you should cast them first.
expr = [col(c).cast("Double").alias(c) 
        for c in vectorAssembler.getInputCols()]

df2 = df2.select(*expr)
df = vectorAssembler.transform(df2)

此外,您还应该规范化features使用MinMaxScaler类以获得更好的结果。

MLLib

为了实现这一点,MLLib您需要首先使用 map 函数,将所有string值转换为Double,并将它们合并到DenseVector中。

rdd = df2.map(lambda data: Vectors.dense([float(c) for c in data]))

在此之后,您可以使用该变量训练您的MLlib 的 KMeans 模型。rdd

于 2016-03-21T22:49:46.640 回答
5

我让PySpark 2.3.1在DataFrame上执行KMeans,如下所示:

  1. 写出要包含在聚类分析中的列的列表:
feat_cols = ['latitude','longitude']`
  1. 您需要所有列都是数值
expr = [col(c).cast("Double").alias(c) for c in feat_cols]
df2 = df2.select(*expr)
  1. 创建您的特征向量mllib.linalg.Vectors
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=feat_cols, outputCol="features")
df3 = assembler.transform(df2).select('features')
  1. 您应该规范化您的功能,因为并不总是需要规范化,但它很少会受到伤害(更多关于这里的信息):
from pyspark.ml.feature import StandardScaler
scaler = StandardScaler(
    inputCol="features",
    outputCol="scaledFeatures",
    withStd=True,
    withMean=False)
scalerModel = scaler.fit(df3)
df4 = scalerModel.transform(df3).drop('features')\
                     .withColumnRenamed('scaledFeatures', 'features')
  1. 将您的DataFrame对象df4转换为密集向量RDD
from pyspark.mllib.linalg import Vectors
data5 = df4.rdd.map(lambda row: Vectors.dense([x for x in row['features']]))
  1. 使用获得的 RDD 对象作为 KMeans 训练的输入:
from pyspark.mllib.clustering import KMeans
model = KMeans.train(data5, k=3, maxIterations=10)
  1. 示例:对向量空间中的点p进行分类:
prediction = model.predict(p)
于 2017-04-27T19:11:06.267 回答