2

所以我知道 Spark 可以在单个 RDD 上执行迭代算法,例如逻辑回归。

    val points = spark.textFile(...).map(parsePoint).cache()
    var w = Vector.random(D) // current separating plane
    for (i <- 1 to ITERATIONS) {
      val gradient = points.map(p =>
        (1 / (1 + exp(-p.y*(w dot p.x))) - 1) * p.y * p.x
      ).reduce(_ + _)
      w -= gradient
    }

上面的例子是迭代的,因为它维护了一个w在每次迭代后更新的全局状态,并且它的更新值在下一次迭代中使用。这个功能在 Spark 流中是否可行?考虑同样的例子,除了现在points是一个 DStream。在这种情况下,您可以创建一个新的 DStream 来计算梯度

val gradient = points.map(p =>
            (1 / (1 + exp(-p.y*(w dot p.x))) - 1) * p.y * p.x
          ).reduce(_ + _)

但是您将如何处理全局状态w。似乎w也必须是一个 DStream (updateStateByKey可能使用),但是它的最新值需要以某种方式传递给points我认为不可能的 map 函数。我不认为 DStreams 可以以这种方式进行通信。我是正确的,还是可以在 Spark Streaming 中进行这样的迭代计算?

4

2 回答 2

3

我刚刚发现使用 foreachRDD 函数非常简单。MLlib 实际上提供了可以使用 DStreams 训练的模型,我在streamingLinearAlgorithm代码中找到了答案。看起来您可以将全局更新变量本地保存在驱动程序中并在 .foreachRDD 中更新它,因此实际上不需要将其转换为 DStream 本身。因此,您可以将其应用于我提供的示例,例如

points.foreachRDD{(rdd,time) =>

     val gradient=rdd.map(p=> (1 / (1 + exp(-p.y*(w dot p.x))) - 1) * p.y * p.x
     )).reduce(_ + _)

  w -= gradient

  }
于 2015-03-18T15:17:47.587 回答
-1

嗯......你可以通过并行化你的迭代器然后折叠它来更新你的渐变来实现一些东西。

另外...我认为您应该将 Spark Streaming 排除在外,因为这个问题看起来不像具有将其链接到任何类型的 Streaming 要求的任何功能。

// So, assuming... points is somehow a RDD[ Point ]
val points = sc.textFile(...).map(parsePoint).cache()
var w = Vector.random(D)

// since fold is ( T )( ( T, T) => T ) => T
val temps = sc.parallelize( 1 to ITERATIONS ).map( w )

// now fold over temps.
val gradient = temps.fold( w )( ( acc, v ) => {
  val gradient = points.map( p =>
    (1 / (1 + exp(-p.y*(acc dot p.x))) - 1) * p.y * p.x
  ).reduce(_ + _)
  acc - gradient
}
于 2015-03-17T10:14:26.627 回答