1

我有一个情绪分析程序,可以使用循环中性网络预测给定的电影评论是正面的还是负面的。我正在为该程序使用 Deeplearning4j 深度学习库。现在我需要将该程序添加到 apache spark 管道中。

这样做时,我有一个MovieReviewClassifier扩展的类,我必须将该类org.apache.spark.ml.classification.ProbabilisticClassifier的一个实例添加到管道中。使用方法将构建模型所需的特征输入到程序中setFeaturesCol(String s)。我添加的特性是有String格式的,因为它们是一组用于情感分析的字符串。但功能应该是形式org.apache.spark.mllib.linalg.VectorUDT。有没有办法将字符串转换为 Vector UDT?

我在下面附上了我的管道实现代码:

public class RNNPipeline {
    final static String RESPONSE_VARIABLE =  "s";
    final static String INDEXED_RESPONSE_VARIABLE =  "indexedClass";
    final static String FEATURES = "features";
    final static String PREDICTION = "prediction";
    final static String PREDICTION_LABEL = "predictionLabel";

    public static void main(String[] args) {

        SparkConf sparkConf = new SparkConf();
        sparkConf.setAppName("test-client").setMaster("local[2]");
        sparkConf.set("spark.driver.allowMultipleContexts", "true");
        JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
        SQLContext sqlContext = new SQLContext(javaSparkContext);

        // ======================== Import data ====================================
        DataFrame dataFrame =    sqlContext.read().format("com.databricks.spark.csv")
                .option("inferSchema", "true")
                .option("header", "true")
                .load("/home/RNN3/WordVec/training.csv");

        // Split in to train/test data
        double [] dataSplitWeights = {0.7,0.3};
        DataFrame[] data = dataFrame.randomSplit(dataSplitWeights);



        // ======================== Preprocess ===========================



        // Encode labels
        StringIndexerModel labelIndexer = new StringIndexer().setInputCol(RESPONSE_VARIABLE)
                .setOutputCol(INDEXED_RESPONSE_VARIABLE)
                .fit(data[0]);


        // Convert indexed labels back to original labels (decode labels).
        IndexToString labelConverter = new IndexToString().setInputCol(PREDICTION)
                .setOutputCol(PREDICTION_LABEL)
                .setLabels(labelIndexer.labels());


        // ======================== Train ========================



        MovieReviewClassifier mrClassifier = new MovieReviewClassifier().setLabelCol(INDEXED_RESPONSE_VARIABLE).setFeaturesCol("Review");



        // Fit the pipeline for training..setLabelCol.setLabelCol.setLabelCol.setLabelCol
        Pipeline pipeline = new Pipeline().setStages(new PipelineStage[] { labelIndexer, mrClassifier, labelConverter});
        PipelineModel pipelineModel = pipeline.fit(data[0]);

        }

  }

Review 是包含要预测为正或负的字符串的特征列。

执行代码时出现以下错误:

Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: Column Review must be of type org.apache.spark.mllib.linalg.VectorUDT@f71b0bce but was actually StringType.
    at scala.Predef$.require(Predef.scala:233)
    at org.apache.spark.ml.util.SchemaUtils$.checkColumnType(SchemaUtils.scala:42)
    at org.apache.spark.ml.PredictorParams$class.validateAndTransformSchema(Predictor.scala:50)
    at org.apache.spark.ml.Predictor.validateAndTransformSchema(Predictor.scala:71)
    at org.apache.spark.ml.Predictor.transformSchema(Predictor.scala:116)
    at org.apache.spark.ml.Pipeline$$anonfun$transformSchema$4.apply(Pipeline.scala:167)
    at org.apache.spark.ml.Pipeline$$anonfun$transformSchema$4.apply(Pipeline.scala:167)
    at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
    at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60)
    at scala.collection.mutable.ArrayOps$ofRef.foldLeft(ArrayOps.scala:108)
    at org.apache.spark.ml.Pipeline.transformSchema(Pipeline.scala:167)
    at org.apache.spark.ml.PipelineStage.transformSchema(Pipeline.scala:62)
    at org.apache.spark.ml.Pipeline.fit(Pipeline.scala:121)
    at RNNPipeline.main(RNNPipeline.java:82)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
4

2 回答 2

2

根据其文档

Vector 的用户定义类型,允许通过 DataFrame 与 SQL 轻松交互。

事实上,在ML 库中

DataFrame 支持许多基本的和结构化的类型;有关支持的类型列表,请参阅 Spark SQL 数据类型参考。除了 Spark SQL 指南中列出的类型之外,DataFrame 还可以使用 ML Vector 类型。

和你被要求的事实org.apache.spark.sql.types.UserDefinedType<Vector>

您可能可以通过传递从您DenseVector的.SparseVectorString

String( "Review"??? ) 到 a的转换Vector取决于您如何组织数据。

于 2016-02-19T14:39:20.880 回答
1

将String类型转换为verctor UDT的方法是使用word2vec。我必须在 spark 管道中添加一个 word2vec 对象来进行转换。

于 2016-03-23T06:38:36.517 回答