2

使用 Spark MLLib,我会构建一个模型(如RandomForest),然后可以通过加载模型并predict在其上传递特征向量来在 Spark 之外对其进行评估。

似乎使用 Spark ML,predict现在被调用transform并且只作用于DataFrame.

有什么方法可以DataFrame在 Spark 之外构建,因为似乎需要 SparkContext 来构建 DataFrame?

我错过了什么吗?

4

3 回答 3

2

Re:有什么方法可以在 Spark 之外构建 DataFrame 吗?

这不可能。DataFrames 存在于 SQLContext 中,它存在于 SparkContext 中。也许您可以以某种方式解决它,但整个故事是 DataFrames 和 SparkContext 之间的连接是设计使然。

于 2016-03-16T07:02:53.403 回答
2

这是我在火花上下文之外使用火花模型的解决方案(使用 PMML):

  1. 您使用这样的管道创建模型:

SparkConf sparkConf = new SparkConf();

SparkSession session = SparkSession.builder().enableHiveSupport().config(sparkConf).getOrCreate();   
String tableName = "schema.table";
Properties dbProperties = new Properties();
dbProperties.setProperty("user",vKey);
dbProperties.setProperty("password",password);
dbProperties.setProperty("AuthMech","3");
dbProperties.setProperty("source","jdbc");
dbProperties.setProperty("driver","com.cloudera.impala.jdbc41.Driver");
String tableName = "schema.table";
String simpleUrl = "jdbc:impala://host:21050/schema"
Dataset<Row> data = session.read().jdbc(simpleUrl ,tableName,dbProperties);
String[] inputCols = {"column1"};
StringIndexer indexer = new StringIndexer().setInputCol("column1").setOutputCol("indexed_column1");
StringIndexerModel alphabet  = indexer.fit(data);
data = alphabet.transform(data);
VectorAssembler assembler = new VectorAssembler().setInputCols(inputCols).setOutputCol("features");
Predictor p = new GBTRegressor();
p.set("maxIter",20);
p.set("maxDepth",2);
p.set("maxBins",204);
p.setLabelCol("faktor");
PipelineStage[] stages = {indexer,assembler, p};
Pipeline pipeline = new Pipeline();
pipeline.setStages(stages);
PipelineModel pmodel = pipeline.fit(data);
PMML pmml = ConverterUtil.toPMML(data.schema(),pmodel);
FileOutputStream fos = new FileOutputStream("model.pmml");
JAXBUtil.marshalPMML(pmml,new StreamResult(fos));
  1. 使用 PPML 进行预测(在本地,没有 spark 上下文,可以应用于参数 Map 而不是 DataFrame):

    PMML pmml = org.jpmml.model.PMMLUtil.unmarshal(new FileInputStream(pmmlFile));
    ModelEvaluatorFactory modelEvaluatorFactory = ModelEvaluatorFactory.newInstance();
    MiningModelEvaluator evaluator = (MiningModelEvaluator) modelEvaluatorFactory.newModelEvaluator(pmml);
    inputFieldMap = new HashMap<String, Field>();     
    Map<FieldName,String> args = new HashMap<FieldName, String>();
    Field curField = evaluator.getInputFields().get(0);
    args.put(curField.getName(), "1.0");
    Map<FieldName, ?> result  = evaluator.evaluate(args);
    
于 2018-02-19T10:09:00.913 回答
0

在这个问题上也花了几天时间。这并不简单。我的第三个建议涉及我专门为此目的编写的代码。

选项1

正如其他评论者所说,predict(Vector)现在可用。但是,您需要知道如何构造向量。如果您不这样做,请参阅选项 3。

选项 2

如果目标是避免设置 Spark 服务器(独立或集群模式),则可以在本地模式下启动 Spark。整个事情将在单个 JVM 中运行。

val spark = SparkSession.builder().config("spark.master", "local[*]").getOrCreate()
// create dataframe from file, or make it up from some data in memory
// use model.transform() to get predictions

但这会给您的预测模块带来不必要的依赖,并且会在运行时消耗 JVM 中的资源。此外,如果预测延迟很关键,例如,一旦请求进入,就在毫秒内做出预测,那么这个选项就太慢了。

选项 3

MLlib FeatureHasher的输出可以用作学习器的输入。该类适用于一种热编码,也适用于固定特征维度的大小。即使您的所有功能都是数字的,您也可以使用它。如果您在训练中使用它,那么您在预测时所需要的就是那里的散列逻辑。它作为火花变压器实现,因此在火花环境之外重复使用并不容易。所以我已经完成了将散列函数提取到库中的工作。您在训练期间照常应用 FeatureHasher 和您的学习者。然后这是您在预测时使用精简哈希器的方法:

// Schema and hash size must stay consistent across training and prediction
val hasher = new FeatureHasherLite(mySchema, myHashSize)

// create sample data-point and hash it
val feature = Map("feature1" -> "value1", "feature2" -> 2.0, "feature3" -> 3, "feature4" -> false)
val featureVector = hasher.hash(feature)

// Make prediction
val prediction = model.predict(featureVector)

您可以在我的 github tilayealemu/sparkmllite中查看详细信息。如果您想复制我的代码,请查看FeatureHasherLite.scala。还有示例代码和单元测试。如果您需要帮助,请随时创建问题。

于 2020-03-21T11:17:17.630 回答