6

我正在使用 Apache Spark ML LSH 的 approxSimilarityJoin 方法加入 2 个数据集,但我看到了一些奇怪的行为。

在(内部)加入之后,数据集有点倾斜,但是每次一个或多个任务都需要花费过多的时间才能完成。

sparkui-1

如您所见,每个任务的中位数是 6 毫秒(我在较小的源数据集上运行它来测试),但 1 个任务需要 10 分钟。它几乎不使用任何 CPU 周期,它实际上连接了数据,但速度如此之慢。下一个最慢的任务在 14 秒内运行,记录多 4 倍,实际上溢出到磁盘。

如果你看sparkuisql

join本身是pos&hashValue(minhash)上两个数据集的内连接,按照minhash规范&udf计算匹配对之间的jaccard距离。

分解哈希表:

modelDataset.select(
      struct(col("*")).as(inputName), posexplode(col($(outputCol))).as(explodeCols))

杰卡德距离函数:

 override protected[ml] def keyDistance(x: Vector, y: Vector): Double = {
    val xSet = x.toSparse.indices.toSet
    val ySet = y.toSparse.indices.toSet
    val intersectionSize = xSet.intersect(ySet).size.toDouble
    val unionSize = xSet.size + ySet.size - intersectionSize
    assert(unionSize > 0, "The union of two input sets must have at least 1 elements")
    1 - intersectionSize / unionSize
  }

处理数据集的加入:

// Do a hash join on where the exploded hash values are equal.
val joinedDataset = explodedA.join(explodedB, explodeCols)
  .drop(explodeCols: _*).distinct()

// Add a new column to store the distance of the two rows.
val distUDF = udf((x: Vector, y: Vector) => keyDistance(x, y), DataTypes.DoubleType)
val joinedDatasetWithDist = joinedDataset.select(col("*"),
  distUDF(col(s"$leftColName.${$(inputCol)}"), col(s"$rightColName.${$(inputCol)}")).as(distCol)
)

// Filter the joined datasets where the distance are smaller than the threshold.
joinedDatasetWithDist.filter(col(distCol) < threshold)

我尝试过缓存、重新分区甚至启用的组合spark.speculation,但都无济于事。

数据由必须匹配的带状疱疹地址文本组成: 53536, Evansville, WI=>53, 35, 36, ev, va, an, ns, vi, il, ll, le, wi 将与城市或邮政编码中存在拼写错误的记录有很短的距离。

这给出了非常准确的结果,但可能是连接偏差的原因。

我的问题是:

  • 什么可能导致这种差异?(一项任务需要很长时间,即使它的记录较少)
  • 如何在不损失准确性的情况下防止 minhash 中的这种偏差?
  • 有没有更好的方法来大规模地做到这一点?(我不能 Jaro-Winkler / levenshtein 将数百万条记录与位置数据集中的所有记录进行比较)
4

1 回答 1

1

可能有点晚了,但无论如何我都会在这里发布我的答案以帮助其他人。我最近在匹配拼写错误的公司名称时遇到了类似的问题(所有执行者都死了 MinHash LSH PySpark approxSimilarityJoin self-join on EMR cluster)。有人通过建议使用 NGrams 来减少数据偏差来帮助我。这对我帮助很大。您也可以尝试使用例如 3 克或 4 克。

我不知道数据有多脏,但是您可以尝试使用状态。它已经大大减少了可能匹配的数量。

真正帮助我提高匹配准确性的是通过在每个组件上运行标签传播算法对连接组件(由 MinHashLSH 生成的连接匹配组)进行后处理。这也允许您增加(NGrams 的)N,从而减轻数据倾斜的问题,将 jaccard 距离参数设置得不approxSimilarityJoin那么紧密,并使用标签传播进行后处理。

最后,我目前正在研究使用 skipgrams 来匹配它。我发现在某些情况下它工作得更好,并在一定程度上减少了数据偏差。

于 2020-07-07T00:47:43.070 回答