我让PySpark 2.3.1在DataFrame上执行KMeans,如下所示:
- 写出要包含在聚类分析中的列的列表:
feat_cols = ['latitude','longitude']`
- 您需要所有列都是数值:
expr = [col(c).cast("Double").alias(c) for c in feat_cols]
df2 = df2.select(*expr)
- 创建您的特征向量
mllib.linalg.Vectors
:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=feat_cols, outputCol="features")
df3 = assembler.transform(df2).select('features')
- 您应该规范化您的功能,因为并不总是需要规范化,但它很少会受到伤害(更多关于这里的信息):
from pyspark.ml.feature import StandardScaler
scaler = StandardScaler(
inputCol="features",
outputCol="scaledFeatures",
withStd=True,
withMean=False)
scalerModel = scaler.fit(df3)
df4 = scalerModel.transform(df3).drop('features')\
.withColumnRenamed('scaledFeatures', 'features')
- 将您的DataFrame对象
df4
转换为密集向量RDD:
from pyspark.mllib.linalg import Vectors
data5 = df4.rdd.map(lambda row: Vectors.dense([x for x in row['features']]))
- 使用获得的 RDD 对象作为 KMeans 训练的输入:
from pyspark.mllib.clustering import KMeans
model = KMeans.train(data5, k=3, maxIterations=10)
- 示例:对向量空间中的点p进行分类:
prediction = model.predict(p)