我正在使用 Apache Spark ML LSH 的 approxSimilarityJoin 方法加入 2 个数据集,但我看到了一些奇怪的行为。
在(内部)加入之后,数据集有点倾斜,但是每次一个或多个任务都需要花费过多的时间才能完成。
如您所见,每个任务的中位数是 6 毫秒(我在较小的源数据集上运行它来测试),但 1 个任务需要 10 分钟。它几乎不使用任何 CPU 周期,它实际上连接了数据,但速度如此之慢。下一个最慢的任务在 14 秒内运行,记录多 4 倍,实际上溢出到磁盘。
join本身是pos&hashValue(minhash)上两个数据集的内连接,按照minhash规范&udf计算匹配对之间的jaccard距离。
分解哈希表:
modelDataset.select(
struct(col("*")).as(inputName), posexplode(col($(outputCol))).as(explodeCols))
杰卡德距离函数:
override protected[ml] def keyDistance(x: Vector, y: Vector): Double = {
val xSet = x.toSparse.indices.toSet
val ySet = y.toSparse.indices.toSet
val intersectionSize = xSet.intersect(ySet).size.toDouble
val unionSize = xSet.size + ySet.size - intersectionSize
assert(unionSize > 0, "The union of two input sets must have at least 1 elements")
1 - intersectionSize / unionSize
}
处理数据集的加入:
// Do a hash join on where the exploded hash values are equal.
val joinedDataset = explodedA.join(explodedB, explodeCols)
.drop(explodeCols: _*).distinct()
// Add a new column to store the distance of the two rows.
val distUDF = udf((x: Vector, y: Vector) => keyDistance(x, y), DataTypes.DoubleType)
val joinedDatasetWithDist = joinedDataset.select(col("*"),
distUDF(col(s"$leftColName.${$(inputCol)}"), col(s"$rightColName.${$(inputCol)}")).as(distCol)
)
// Filter the joined datasets where the distance are smaller than the threshold.
joinedDatasetWithDist.filter(col(distCol) < threshold)
我尝试过缓存、重新分区甚至启用的组合spark.speculation
,但都无济于事。
数据由必须匹配的带状疱疹地址文本组成:
53536, Evansville, WI
=>53, 35, 36, ev, va, an, ns, vi, il, ll, le, wi
将与城市或邮政编码中存在拼写错误的记录有很短的距离。
这给出了非常准确的结果,但可能是连接偏差的原因。
我的问题是:
- 什么可能导致这种差异?(一项任务需要很长时间,即使它的记录较少)
- 如何在不损失准确性的情况下防止 minhash 中的这种偏差?
- 有没有更好的方法来大规模地做到这一点?(我不能 Jaro-Winkler / levenshtein 将数百万条记录与位置数据集中的所有记录进行比较)