我有一个情绪分析程序,可以使用循环中性网络预测给定的电影评论是正面的还是负面的。我正在为该程序使用 Deeplearning4j 深度学习库。现在我需要将该程序添加到 apache spark 管道中。
这样做时,我有一个MovieReviewClassifier
扩展的类,我必须将该类org.apache.spark.ml.classification.ProbabilisticClassifier
的一个实例添加到管道中。使用方法将构建模型所需的特征输入到程序中setFeaturesCol(String s)
。我添加的特性是有String
格式的,因为它们是一组用于情感分析的字符串。但功能应该是形式org.apache.spark.mllib.linalg.VectorUDT
。有没有办法将字符串转换为 Vector UDT?
我在下面附上了我的管道实现代码:
public class RNNPipeline {
final static String RESPONSE_VARIABLE = "s";
final static String INDEXED_RESPONSE_VARIABLE = "indexedClass";
final static String FEATURES = "features";
final static String PREDICTION = "prediction";
final static String PREDICTION_LABEL = "predictionLabel";
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf();
sparkConf.setAppName("test-client").setMaster("local[2]");
sparkConf.set("spark.driver.allowMultipleContexts", "true");
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
SQLContext sqlContext = new SQLContext(javaSparkContext);
// ======================== Import data ====================================
DataFrame dataFrame = sqlContext.read().format("com.databricks.spark.csv")
.option("inferSchema", "true")
.option("header", "true")
.load("/home/RNN3/WordVec/training.csv");
// Split in to train/test data
double [] dataSplitWeights = {0.7,0.3};
DataFrame[] data = dataFrame.randomSplit(dataSplitWeights);
// ======================== Preprocess ===========================
// Encode labels
StringIndexerModel labelIndexer = new StringIndexer().setInputCol(RESPONSE_VARIABLE)
.setOutputCol(INDEXED_RESPONSE_VARIABLE)
.fit(data[0]);
// Convert indexed labels back to original labels (decode labels).
IndexToString labelConverter = new IndexToString().setInputCol(PREDICTION)
.setOutputCol(PREDICTION_LABEL)
.setLabels(labelIndexer.labels());
// ======================== Train ========================
MovieReviewClassifier mrClassifier = new MovieReviewClassifier().setLabelCol(INDEXED_RESPONSE_VARIABLE).setFeaturesCol("Review");
// Fit the pipeline for training..setLabelCol.setLabelCol.setLabelCol.setLabelCol
Pipeline pipeline = new Pipeline().setStages(new PipelineStage[] { labelIndexer, mrClassifier, labelConverter});
PipelineModel pipelineModel = pipeline.fit(data[0]);
}
}
Review 是包含要预测为正或负的字符串的特征列。
执行代码时出现以下错误:
Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: Column Review must be of type org.apache.spark.mllib.linalg.VectorUDT@f71b0bce but was actually StringType.
at scala.Predef$.require(Predef.scala:233)
at org.apache.spark.ml.util.SchemaUtils$.checkColumnType(SchemaUtils.scala:42)
at org.apache.spark.ml.PredictorParams$class.validateAndTransformSchema(Predictor.scala:50)
at org.apache.spark.ml.Predictor.validateAndTransformSchema(Predictor.scala:71)
at org.apache.spark.ml.Predictor.transformSchema(Predictor.scala:116)
at org.apache.spark.ml.Pipeline$$anonfun$transformSchema$4.apply(Pipeline.scala:167)
at org.apache.spark.ml.Pipeline$$anonfun$transformSchema$4.apply(Pipeline.scala:167)
at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60)
at scala.collection.mutable.ArrayOps$ofRef.foldLeft(ArrayOps.scala:108)
at org.apache.spark.ml.Pipeline.transformSchema(Pipeline.scala:167)
at org.apache.spark.ml.PipelineStage.transformSchema(Pipeline.scala:62)
at org.apache.spark.ml.Pipeline.fit(Pipeline.scala:121)
at RNNPipeline.main(RNNPipeline.java:82)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)