14

我需要添加两个存储在两个文件中的矩阵。

latest1.txt和的内容latest2.txt具有下一个str:

1 2 3
4 5 6
7 8 9

我正在阅读这些文件,如下所示:

scala> val rows = sc.textFile(“latest1.txt”).map { line => val values = line.split(‘ ‘).map(_.toDouble)
    Vectors.sparse(values.length,values.zipWithIndex.map(e => (e._2, e._1)).filter(_._2 != 0.0))
}

scala> val r1 = rows
r1: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector] = MappedRDD[2] at map at :14

scala> val rows = sc.textFile(“latest2.txt”).map { line => val values = line.split(‘ ‘).map(_.toDouble)
    Vectors.sparse(values.length,values.zipWithIndex.map(e => (e._2, e._1)).filter(_._2 != 0.0))
}

scala> val r2 = rows
r2: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector] = MappedRDD[2] at map at :14

我想添加r1,r2。那么,有没有办法RDD[mllib.linalg.Vector]在 Apache-Spark 中添加这两个 s。

4

2 回答 2

20

这实际上是一个很好的问题。我经常使用 mllib,但没有意识到这些基本的线性代数运算不容易访问。

关键是基础微风向量具有您所期望的所有线性代数操作 - 当然包括您特别提到的基本元素明智添加。

然而,微风的实现是通过以下方式对外界隐藏的:

[private mllib]

那么,从外部世界/公共 API 的角度来看,我们如何访问这些原语?

其中一些已经暴露:例如平方和:

/**
 * Returns the squared distance between two Vectors.
 * @param v1 first Vector.
 * @param v2 second Vector.
 * @return squared distance between two Vectors.
 */
def sqdist(v1: Vector, v2: Vector): Double = { 
  ...
}

然而,这些可用方法的选择是有限的——实际上不包括基本操作,包括元素加法、减法、乘法等。

所以这是我能看到的最好的:

  • 将向量转换为微风:
  • 在微风中执行矢量操作
  • 从微风转换回 mllib 向量

这是一些示例代码:

val v1 = Vectors.dense(1.0, 2.0, 3.0)
val v2 = Vectors.dense(4.0, 5.0, 6.0)
val bv1 = new DenseVector(v1.toArray)
val bv2 = new DenseVector(v2.toArray)

val vectout = Vectors.dense((bv1 + bv2).toArray)
vectout: org.apache.spark.mllib.linalg.Vector = [5.0,7.0,9.0]
于 2015-01-31T01:35:00.470 回答
3

以下代码公开了 Spark 的 asBreeze 和 fromBreeze 方法。该解决方案支持SparseVector与使用vector.toArray. 请注意,Spark 将来可能会更改其 API,并且已经重命名toBreezeasBreeze.

package org.apache.spark.mllib.linalg
import breeze.linalg.{Vector => BV}
import org.apache.spark.sql.functions.udf

/** expose vector.toBreeze and Vectors.fromBreeze
  */
object VectorUtils {

  def fromBreeze(breezeVector: BV[Double]): Vector = {
    Vectors.fromBreeze( breezeVector )
  }

  def asBreeze(vector: Vector): BV[Double] = {
    // this is vector.asBreeze in Spark 2.0
    vector.toBreeze
  }

  val addVectors = udf {
    (v1: Vector, v2: Vector) => fromBreeze( asBreeze(v1) + asBreeze(v2) )
  }

}

有了这个你可以做到df.withColumn("xy", addVectors($"x", $"y"))

于 2016-12-13T08:20:43.200 回答