[更新:从 Spark 2.2 开始,PCA 和 SVD 在 PySpark 中都可用 - 请参阅 JIRA 票证SPARK-6227和用于 Spark ML 2.2 的PCA和PCAModel ;下面的原始答案仍然适用于较旧的 Spark 版本。]
好吧,这似乎令人难以置信,但确实没有办法从 PCA 分解中提取此类信息(至少从 Spark 1.5 开始)。但同样,也有许多类似的“抱怨”——例如,请参阅此处,因为无法从CrossValidatorModel
.
幸运的是,几个月前,我参加了由 AMPLab(伯克利)和 Databricks(即 Spark 的创建者)举办的“可扩展机器学习” MOOC,在那里我们“手动”实施了完整的 PCA 管道作为家庭作业的一部分。从那时起,我已经修改了我的函数(请放心,我得到了充分的信任:-),以便使用与您的格式相同的数据帧作为输入(而不是 RDD)(即DenseVectors
包含数字特征的行)。
我们首先需要定义一个中间函数,estimatedCovariance
,如下:
import numpy as np
def estimateCovariance(df):
"""Compute the covariance matrix for a given dataframe.
Note:
The multi-dimensional covariance array should be calculated using outer products. Don't
forget to normalize the data by first subtracting the mean.
Args:
df: A Spark dataframe with a column named 'features', which (column) consists of DenseVectors.
Returns:
np.ndarray: A multi-dimensional array where the number of rows and columns both equal the
length of the arrays in the input dataframe.
"""
m = df.select(df['features']).map(lambda x: x[0]).mean()
dfZeroMean = df.select(df['features']).map(lambda x: x[0]).map(lambda x: x-m) # subtract the mean
return dfZeroMean.map(lambda x: np.outer(x,x)).sum()/df.count()
然后,我们可以编写一个mainpca
函数如下:
from numpy.linalg import eigh
def pca(df, k=2):
"""Computes the top `k` principal components, corresponding scores, and all eigenvalues.
Note:
All eigenvalues should be returned in sorted order (largest to smallest). `eigh` returns
each eigenvectors as a column. This function should also return eigenvectors as columns.
Args:
df: A Spark dataframe with a 'features' column, which (column) consists of DenseVectors.
k (int): The number of principal components to return.
Returns:
tuple of (np.ndarray, RDD of np.ndarray, np.ndarray): A tuple of (eigenvectors, `RDD` of
scores, eigenvalues). Eigenvectors is a multi-dimensional array where the number of
rows equals the length of the arrays in the input `RDD` and the number of columns equals
`k`. The `RDD` of scores has the same number of rows as `data` and consists of arrays
of length `k`. Eigenvalues is an array of length d (the number of features).
"""
cov = estimateCovariance(df)
col = cov.shape[1]
eigVals, eigVecs = eigh(cov)
inds = np.argsort(eigVals)
eigVecs = eigVecs.T[inds[-1:-(col+1):-1]]
components = eigVecs[0:k]
eigVals = eigVals[inds[-1:-(col+1):-1]] # sort eigenvals
score = df.select(df['features']).map(lambda x: x[0]).map(lambda x: np.dot(x, components.T) )
# Return the `k` principal components, `k` scores, and all eigenvalues
return components.T, score, eigVals
测试
让我们先看看现有方法的结果,使用 Spark ML PCA文档中的示例数据(将它们修改为 all DenseVectors
):
from pyspark.ml.feature import *
from pyspark.mllib.linalg import Vectors
data = [(Vectors.dense([0.0, 1.0, 0.0, 7.0, 0.0]),),
(Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]),),
(Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]),)]
df = sqlContext.createDataFrame(data,["features"])
pca_extracted = PCA(k=2, inputCol="features", outputCol="pca_features")
model = pca_extracted.fit(df)
model.transform(df).collect()
[Row(features=DenseVector([0.0, 1.0, 0.0, 7.0, 0.0]), pca_features=DenseVector([1.6486, -4.0133])),
Row(features=DenseVector([2.0, 0.0, 3.0, 4.0, 5.0]), pca_features=DenseVector([-4.6451, -1.1168])),
Row(features=DenseVector([4.0, 0.0, 0.0, 6.0, 7.0]), pca_features=DenseVector([-6.4289, -5.338]))]
然后,用我们的方法:
comp, score, eigVals = pca(df)
score.collect()
[array([ 1.64857282, 4.0132827 ]),
array([-4.64510433, 1.11679727]),
array([-6.42888054, 5.33795143])]
让我强调一下,我们没有collect()
在我们定义的函数中使用任何方法——它应该是score
一个。RDD
请注意,我们第二列的符号都与现有方法导出的符号相反;但这不是问题:根据Hastie & Tibshirani 合着的(可免费下载的)统计学习简介,p. 382
每个主成分加载向量都是唯一的,最多一个符号翻转。这意味着两个不同的软件包将产生相同的主成分加载向量,尽管这些加载向量的符号可能不同。符号可能不同,因为每个主成分加载向量都指定了 p 维空间中的方向:翻转符号没有效果,因为方向没有改变。[...] 类似地,分数向量在符号翻转之前是唯一的,因为 Z 的方差与 -Z 的方差相同。
最后,现在我们有了可用的特征值,编写一个解释方差百分比的函数是微不足道的:
def varianceExplained(df, k=1):
"""Calculate the fraction of variance explained by the top `k` eigenvectors.
Args:
df: A Spark dataframe with a 'features' column, which (column) consists of DenseVectors.
k: The number of principal components to consider.
Returns:
float: A number between 0 and 1 representing the percentage of variance explained
by the top `k` eigenvectors.
"""
components, scores, eigenvalues = pca(df, k)
return sum(eigenvalues[0:k])/sum(eigenvalues)
varianceExplained(df,1)
# 0.79439325322305299
作为测试,我们还检查示例数据中解释的方差是否为 1.0,对于 k=5(因为原始数据是 5 维的):
varianceExplained(df,5)
# 1.0
[使用 Spark 1.5.0 和 1.5.1 开发和测试]