1

我在 Scala 中有一个 CoordinateMatrix 格式的矩阵。矩阵是稀疏的,整体看起来像(在 coo_matrix.entries.collect 上),

Array[org.apache.spark.mllib.linalg.distributed.MatrixEntry] = Array(
  MatrixEntry(0,0,-1.0), MatrixEntry(0,1,-1.0), MatrixEntry(1,0,-1.0),
  MatrixEntry(1,1,-1.0), MatrixEntry(1,2,-1.0), MatrixEntry(2,1,-1.0), 
  MatrixEntry(2,2,-1.0), MatrixEntry(0,3,-1.0), MatrixEntry(0,4,-1.0), 
  MatrixEntry(0,5,-1.0), MatrixEntry(3,0,-1.0), MatrixEntry(4,0,-1.0), 
  MatrixEntry(3,3,-1.0), MatrixEntry(3,4,-1.0), MatrixEntry(4,3,-1.0),
  MatrixEntry(4,4,-1.0))

这只是一个很小的样本量。矩阵的大小为 N x N(其中 N = 100 万),尽管其中大部分是稀疏的。在 Spark Scala 中获取该矩阵的行和的有效方法之一是什么?目标是创建一个由行总和组成的新 RDD,即大小为 N,其中第一个元素是 row1 的行总和,依此类推..

我总是可以将此坐标矩阵转换为 IndexedRowMatrix 并运行一个 for 循环来一次计算一次迭代的行和,但这不是最有效的方法。

任何想法都非常感谢。

4

1 回答 1

4

由于改组,这将非常昂贵(这是您在此处无法真正避免的部分),但您可以将条目转换为PairRDD并通过键减少:

import org.apache.spark.mllib.linalg.distributed.{MatrixEntry, CoordinateMatrix}
import org.apache.spark.rdd.RDD

val mat: CoordinateMatrix = ???
val rowSums: RDD[Long, Double)] = mat.entries
  .map{case MatrixEntry(row, _, value) => (row, value)}
  .reduceByKey(_ + _)

与基于以下的解决方案不同indexedRowMatrix

import org.apache.spark.mllib.linalg.distributed.IndexedRow

mat.toIndexedRowMatrix.rows.map{
  case IndexedRow(i, values) => (i, values.toArray.sum)
}

它不需要groupBy转换或中间SparseVectors

于 2015-10-23T15:58:47.380 回答