1

Scala中有没有办法为带有附加/额外参数的RDD转换定义一个显式函数?

例如,下面的 Python 代码使用 lambda 表达式将转换映射(需要一个带有一个参数的函数)与函数 my_power(实际上有两个参数)一起应用。

def my_power(a, b):
    res = a ** b
    return res

def my_main(sc, n):
    inputRDD = sc.parallelize([1, 2, 3, 4])
    powerRDD = inputRDD.map(lambda x: my_power(x, n))
    resVAL = powerRDD.collect()
    for item in resVAL:
        print(item)

但是,在 Scala 中尝试等效实现时,我得到一个 Task not serializable 异常。

val myPower: (Int, Int) => Int = (a: Int, b: Int) => {
  val res: Int = math.pow(a, b).toInt
  res
}

def myMain(sc: SparkContext, n: Int): Unit = {
  val inputRDD: RDD[Int] = sc.parallelize(Array(1, 2, 3, 4))
  val squareRDD: RDD[Int] = inputRDD.map( (x: Int) => myPower(x, n) )
  val resVAL: Array[Int] = squareRDD.collect()
  for (item <- resVAL){
    println(item)
  }
}
4

2 回答 2

2

通过这种方式,它对我有用。

package examples

import org.apache.log4j.Level
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession

object RDDTest extends App {
  val logger = org.apache.log4j.Logger.getLogger("org")
  logger.setLevel(Level.WARN)
  val spark = SparkSession.builder()
    .appName(this.getClass.getName)
    .config("spark.master", "local[*]").getOrCreate()


  val myPower: (Int, Int) => Int = (a: Int, b: Int) => {
    val res: Int = math.pow(a, b).toInt
    res
  }
  val scontext = spark.sparkContext
  myMain(scontext, 10);

  def myMain(sc: SparkContext, n: Int): Unit = {
    val inputRDD: RDD[Int] = sc.parallelize(Array(1, 2, 3, 4))
    val squareRDD: RDD[Int] = inputRDD.map((x: Int) => myPower(x, n))
    val resVAL: Array[Int] = squareRDD.collect()
    for ( item <- resVAL ) {
      println(item)
    }
  }
}


结果 :

1024
59049
1048576

还有另一个选项可以广播 n 使用sc.broadcast和访问闭包,如地图也是可能的......

于 2019-08-12T19:39:45.490 回答
0

只需添加一个局部变量作为函数别名就可以了:

val myPower: (Int, Int) => Int = (a: Int, b: Int) => {
  val res: Int = math.pow(a, b).toInt
  res
}

def myMain(sc: SparkContext, n: Int): Unit = {
  val inputRDD: RDD[Int] = sc.parallelize(Array(1, 2, 3, 4))

  val myPowerAlias = myPower
  val squareRDD: RDD[Int] = inputRDD.map( (x: Int) => myPowerAlias(x, n) )

  val resVAL: Array[Int] = squareRDD.collect()
  for (item <- resVAL){
    println(item)
  }
}
于 2019-08-13T14:08:38.950 回答