据我所知,当前版本(1.2.1)不支持此功能。原生 Scala 代码 (tree.py) 上的 Python 包装器仅定义“预测”函数,这些函数又调用相应的 Scala 对应项 (treeEnsembleModels.scala)。后者通过在二元决策中进行投票来做出决策。一个更简洁的解决方案是提供一个概率预测,该预测可以任意阈值或用于 ROC 计算,如 sklearn。应该为将来的版本添加此功能!
作为一种解决方法,我将 predict_proba 实现为纯 Python 函数(参见下面的示例)。它既不优雅也不高效,因为它在森林中的一组单独的决策树上运行循环。诀窍 - 或者更确切地说是一个肮脏的黑客 - 是访问 Java 决策树模型的数组并将它们转换为 Python 对应物。之后,您可以计算单个模型对整个数据集的预测,并使用“zip”在 RDD 中累积它们的总和。除以树的数量得到所需的结果。对于大型数据集,主节点中少量决策树的循环应该是可以接受的。
由于将 Python 集成到 Spark(在 Java 中运行)的困难,下面的代码相当棘手。应该非常小心,不要将任何复杂的数据发送到工作节点,这会导致由于序列化问题而导致的崩溃。任何引用 Spark 上下文的代码都不能在工作节点上运行。此外,不能序列化引用任何 Java 代码的代码。例如,在下面的代码中使用 len(trees) 而不是 ntrees 可能很诱人 - 砰!用 Java/Scala 编写这样的包装器会更加优雅,例如通过在工作节点上的决策树上运行循环,从而降低通信成本。
下面的测试函数表明 predict_proba 给出的测试误差与原始示例中使用的 predict 相同。
def predict_proba(rf_model, data):
'''
This wrapper overcomes the "binary" nature of predictions in the native
RandomForestModel.
'''
# Collect the individual decision tree models by calling the underlying
# Java model. These are returned as JavaArray defined by py4j.
trees = rf_model._java_model.trees()
ntrees = rf_model.numTrees()
scores = DecisionTreeModel(trees[0]).predict(data.map(lambda x: x.features))
# For each decision tree, apply its prediction to the entire dataset and
# accumulate the results using 'zip'.
for i in range(1,ntrees):
dtm = DecisionTreeModel(trees[i])
scores = scores.zip(dtm.predict(data.map(lambda x: x.features)))
scores = scores.map(lambda x: x[0] + x[1])
# Divide the accumulated scores over the number of trees
return scores.map(lambda x: x/ntrees)
def testError(lap):
testErr = lap.filter(lambda (v, p): v != p).count() / float(testData.count())
print('Test Error = ' + str(testErr))
def testClassification(trainingData, testData):
model = RandomForest.trainClassifier(trainingData, numClasses=2,
categoricalFeaturesInfo={},
numTrees=50, maxDepth=30)
# Compute test error by thresholding probabilistic predictions
threshold = 0.5
scores = predict_proba(model,testData)
pred = scores.map(lambda x: 0 if x < threshold else 1)
lab_pred = testData.map(lambda lp: lp.label).zip(pred)
testError(lab_pred)
# Compute test error by comparing binary predictions
predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
testError(labelsAndPredictions)
总而言之,这是学习 Spark 的一个很好的练习!