我正在使用嵌套循环和外部 jar 比较 scala/spark 中的 2 个数据帧。
for (nrow <- dfm.rdd.collect) {
var mid = nrow.mkString(",").split(",")(0)
var mfname = nrow.mkString(",").split(",")(1)
var mlname = nrow.mkString(",").split(",")(2)
var mlssn = nrow.mkString(",").split(",")(3)
for (drow <- dfn.rdd.collect) {
var nid = drow.mkString(",").split(",")(0)
var nfname = drow.mkString(",").split(",")(1)
var nlname = drow.mkString(",").split(",")(2)
var nlssn = drow.mkString(",").split(",")(3)
val fNameArray = Array(mfname,nfname)
val lNameArray = Array (mlname,nlname)
val ssnArray = Array (mlssn,nlssn)
val fnamescore = Main.resultSet(fNameArray)
val lnamescore = Main.resultSet(lNameArray)
val ssnscore = Main.resultSet(ssnArray)
val overallscore = (fnamescore +lnamescore +ssnscore) /3
if(overallscore >= .95) {
println("MeditechID:".concat(mid)
.concat(" MeditechFname:").concat(mfname)
.concat(" MeditechLname:").concat(mlname)
.concat(" MeditechSSN:").concat(mlssn)
.concat(" NextGenID:").concat(nid)
.concat(" NextGenFname:").concat(nfname)
.concat(" NextGenLname:").concat(nlname)
.concat(" NextGenSSN:").concat(nlssn)
.concat(" FnameScore:").concat(fnamescore.toString)
.concat(" LNameScore:").concat(lnamescore.toString)
.concat(" SSNScore:").concat(ssnscore.toString)
.concat(" OverallScore:").concat(overallscore.toString))
}
}
}
我希望做的是为外循环添加一些并行性,以便我可以创建一个 5 的线程池并从外循环的集合中提取 5 条记录,并将它们与内循环的集合进行比较,而不是连续执行此操作. 所以结果是我可以指定线程数,在任何给定时间从外部循环的集合处理中针对内部循环中的集合有 5 条记录。我该怎么做呢?