0

这几天我一直在为这个问题头疼。感觉它应该很容易直观......真的希望有人能提供帮助!

我已经org.nd4j.linalg.api.ndarray.INDArray从一些半结构化数据中构建了一个单词出现,如下所示:

import org.nd4j.linalg.factory.Nd4j
import org.nd4s.Implicits._

val docMap = collection.mutable.Map[Int,Map[Int,Int]] //of the form Map(phrase -> Map(phrasePosition -> word)
val words = ArrayBuffer("word_1","word_2","word_3",..."word_n")
val windows = ArrayBuffer("$phrase,$phrasePosition_1","$phrase,$phrasePosition_2",..."$phrase,$phrasePosition_n") 

var matrix = Nd4j.create(windows.length*words.length).reshape(windows.length,words.length)
for (row <- matrix.shape(0)){
    for(column <- matrix.shape(1){
        //+1 to (row,column) if word occurs at phrase, phrasePosition indicated by window_n.
    }
}
val finalmatrix = matrix.T.dot(matrix) // to get co-occurrence matrix

到目前为止,一切都很好...

在这一点的下游,我需要将数据集成到 Spark 中的现有管道中,并使用 pca 等的实现,所以我需要创建一个 DataFrame,或者至少是一个 RDD。如果我提前知道单词和/或窗口的数量,我可以执行以下操作:

case class Row(window : String, word_1 : Double, word_2 : Double, ...etc)

val dfSeq = ArrayBuffer[Row]()
for (row <- matrix.shape(0)){
    dfSeq += Row(windows(row),matrix.get(NDArrayIndex.point(row), NDArrayIndex.all()))
}
sc.parallelize(dfSeq).toDF("window","word_1","word_2",...etc)

但是窗口和单词的数量是在运行时确定的。我正在寻找一个 WindowsxWordsorg.apache.spark.sql.DataFrame作为输出,输入是一个 WindowsxWordsorg.nd4j.linalg.api.ndarray.INDArray

提前感谢您提供的任何帮助。

4

1 回答 1

0

好的,所以经过几天的工作,看起来简单的答案是:没有。Nd4j事实上,尝试在这种情况下使用似乎是一个坏主意,原因如下:

  1. INDArray一旦你把数据从本机格式中取出,就(真的)很难。
  2. 即使使用像guava这样的东西, .data() 方法也会把所有东西都放在堆上,这很快就会变得昂贵。
  3. 你不得不编译一个程序集 jar 或使用 hdfs 等来处理库本身。

我也确实考虑过使用Breeze,它实际上可能提供了一个可行的解决方案,但也存在一些相同的问题,并且不能用于分布式数据结构。

不幸的是,使用本机 Spark / Scala 数据类型,虽然一旦你知道如何更容易,是 - 对于像我这样来自 Python + numpy + pandas 天堂的人来说 - 痛苦地令人费解和丑陋。

尽管如此,我确实成功地实施了这个解决方案:

import org.apache.spark.mllib.linalg.{Vectors,Vector,Matrix,DenseMatrix,DenseVector}
import org.apache.spark.mllib.linalg.distributed.RowMatrix

//first make a pseudo-matrix from Scala Array[Double]:
var rowSeq = Seq.fill(windows.length)(Array.fill(words.length)(0d))

//iterate through 'rows' and 'columns' to fill it:
for (row 0 until windows.length){
    for (column 0 until words.length){
        // rowSeq(row)(column) += 1 if word occurs at phrase, phrasePosition indicated by window_n.
    }
}

//create Spark DenseMatrix
val rows : Array[Double] = rowSeq.transpose.flatten.toArray
val matrix = new DenseMatrix(windows.length,words.length,rows)

我需要 Nd4J 的主要操作之一是,matrix.T.dot(matrix)但事实证明你不能将 2 个 Type 矩阵相乘org.apache.spark.mllib.linalg.DenseMatrix,其中一个 (A) 必须是 aorg.apache.spark.mllib.linalg.distributed.RowMatrix并且 - 你猜对了 - 你不能调用matrix.transpose()a RowMatrix,只上一个DenseMatrix!由于它与问题并不真正相关,因此我将省略该部分,除了解释该步骤产生的内容是RowMatrix. 解决方案的最后一部分也应在此处此处获得信用:

val rowMatrix : [RowMatrix] = transposeAndDotDenseMatrix(matrix)

// get DataFrame from RowMatrix via DenseMatrix
val newdense = new DenseMatrix(rowMatrix.numRows().toInt,rowMatrix.numCols().toInt,rowMatrix.rows.collect.flatMap(x => x.toArray)) // the call to collect() here is undesirable...
val matrixRows = newdense.rowIter.toSeq.map(_.toArray)
val df = spark.sparkContext.parallelize(matrixRows).toDF("Rows")

// then separate columns:
val df2 = (0 until words.length).foldLeft(df)((df, num) => 
df.withColumn(words(num), $"Rows".getItem(num)))
.drop("Rows")

很想听听对此的改进和建议,谢谢。

于 2019-04-20T11:56:30.680 回答