-1

我正在使用嵌套循环和外部 jar 比较 scala/spark 中的 2 个数据帧。

for (nrow <- dfm.rdd.collect) {   
  var mid = nrow.mkString(",").split(",")(0)
  var mfname = nrow.mkString(",").split(",")(1)
  var mlname = nrow.mkString(",").split(",")(2)  
  var mlssn = nrow.mkString(",").split(",")(3)  

  for (drow <- dfn.rdd.collect) {
    var nid = drow.mkString(",").split(",")(0)
    var nfname = drow.mkString(",").split(",")(1)
    var nlname = drow.mkString(",").split(",")(2)  
    var nlssn = drow.mkString(",").split(",")(3)  

    val fNameArray = Array(mfname,nfname)
    val lNameArray = Array (mlname,nlname)
    val ssnArray = Array (mlssn,nlssn)

    val fnamescore = Main.resultSet(fNameArray)
    val lnamescore = Main.resultSet(lNameArray)
    val ssnscore =  Main.resultSet(ssnArray)

    val overallscore = (fnamescore +lnamescore +ssnscore) /3

    if(overallscore >= .95) {
       println("MeditechID:".concat(mid)
         .concat(" MeditechFname:").concat(mfname)
         .concat(" MeditechLname:").concat(mlname)
         .concat(" MeditechSSN:").concat(mlssn)
         .concat(" NextGenID:").concat(nid)
         .concat(" NextGenFname:").concat(nfname)
         .concat(" NextGenLname:").concat(nlname)
         .concat(" NextGenSSN:").concat(nlssn)
         .concat(" FnameScore:").concat(fnamescore.toString)
         .concat(" LNameScore:").concat(lnamescore.toString)
         .concat(" SSNScore:").concat(ssnscore.toString)
         .concat(" OverallScore:").concat(overallscore.toString))
    }
  }
}

我希望做的是为外循环添加一些并行性,以便我可以创建一个 5 的线程池并从外循环的集合中提取 5 条记录,并将它们与内循环的集合进行比较,而不是连续执行此操作. 所以结果是我可以指定线程数,在任何给定时间从外部循环的集合处理中针对内部循环中的集合有 5 条记录。我该怎么做呢?

4

2 回答 2

4

让我们从分析你在做什么开始。您收集dfm到驱动程序的数据。然后,对于您从中收集数据的每个元素dfn,对其进行转换并计算每对元素的分数。

这在很多方面都是有问题的。首先,即使不考虑并行计算,对元素的转换dfn也与元素一样多次dfm。此外,您收集 的dfn每一行的数据dfm。这是很多网络通信(驱动程序和执行程序之间)。

如果你想使用 spark 来并行化你的计算,你需要使用 API(RDD、SQL 或 Datasets)。您似乎想使用 RDD 来执行笛卡尔积(这是 O(N*M) 所以要小心,可能需要一段时间)。

让我们首先在笛卡尔积之前转换数据,以避免每个元素多次执行它们。另外,为了清楚起见,让我们定义一个案例类来包含您的数据和一个将您的数据帧转换为该案例类的 RDD 的函数。

case class X(id : String, fname : String, lname : String, lssn : String)
def toRDDofX(df : DataFrame) = {
    df.rdd.map(row => {
        // using pattern matching to convert the array to the case class X
        row.mkString(",").split(",") match {
            case Array(a, b, c, d) => X(a, b, c, d)
        } 
    })
}

然后,我filter只保留分数超过.95但您可以使用的元组mapforeach...取决于您打算做什么。

val rddn = toRDDofX(dfn)
val rddm = toRDDofX(dfm)
rddn.cartesian(rddm).filter{ case (xn, xm) => {
    val fNameArray = Array(xm.fname,xn.fname)
    val lNameArray = Array(xm.lname,xn.lname)
    val ssnArray = Array(xm.lssn,xn.lssn)

    val fnamescore = Main.resultSet(fNameArray)
    val lnamescore = Main.resultSet(lNameArray)
    val ssnscore =  Main.resultSet(ssnArray)

    val overallscore = (fnamescore +lnamescore +ssnscore) /3
    // and then, let's say we filter by score
    overallscore > .95
}} 
于 2019-05-27T09:03:00.277 回答
1

这不是迭代 spark 数据帧的正确方法。主要关注的是dfm.rdd.collect. 如果数据框任意大,您最终会出现异常。这是因为该collect函数本质上将所有数据都带入了主节点。

另一种方法是使用 rdd 的 foreach 或 map 构造。

dfm.rdd.foreach(x => {
    // your logic
}  

现在您正在尝试在这里迭代第二个数据框。恐怕那是不可能的。优雅的方法是加入dfmanddfn并迭代结果数据集以计算您的函数。

于 2019-05-27T08:06:03.400 回答