3

我正在使用 sc.textFile() 从 Mahout 中的 Alluxio 获取数据,但它是 spark RDD。我的程序进一步使用这个 spark RDD 作为 Mahout DRM,因此我需要将 RDD 转换为 DRM。所以我当前的代码保持稳定。

4

2 回答 2

2

可以通过以下步骤从 Apache Spark RDD 创建 Apache Mahout DRM:

  1. 将 RDD 的每一行转换为 Mahout Vector
  2. 用索引压缩 RDD(并交换,以便元组的形式(Long, Vector)
  3. 用 DRM 包装 RDD。

考虑以下示例代码:

val rddA = sc.parallelize(Array((1.0, 2.0, 3.0),
            ( 2.0, 3.0, 4.0),
            ( 4.0, 5.0, 6.0)))

val drmRddA: DrmRdd[Long] = rddA.map(a => new DenseVector(a))
                 .zipWithIndex()
                 .map(t => (t._2, t._1))

val drmA = drmWrap(rdd= drmRddA)

来源/更多信息/ 无耻的自我宣传(向底部):我的博客

于 2017-04-07T14:10:22.793 回答
1

转换数据的主要问题通常是 Mahout 使用整数来引用通用矩阵的行号和列号,但数据通常有自己的行和列键,它们是某种字符串 id。

Mahout 有一个名为 an 的对象,它保留了(实际上)IndexedDatasetSpark中的 id,但也创建了 Mahout DRM。这样做的好处是字典将在计算完成后将行和列的整数转换回您的 ID。BiMapsBiDictionaries

如果你有一个矩阵元素的 RDD[String, String] ,这将进行转换。如果您有一个行数组,您可以从这里开始编写您自己的转换代码。

https://github.com/apache/mahout/blob/master/spark/src/main/scala/org/apache/mahout/sparkbindings/indexeddataset/IndexedDatasetSpark.scala#L75

于 2017-04-07T15:36:14.983 回答