1

我正在尝试以 SparseVectors 的形式计算某些 id 及其属性之间的 Jaccard 距离。

from pyspark.ml.feature import MinHashLSH
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col
from pyspark.sql.functions import monotonically_increasing_id
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
import pyspark.sql.functions as F
from pyspark.mllib.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf
sqlContext = SQLContext(sc)
df = sqlContext.read.load("path")
par = udf(lambda s: Vectors.parse(s), VectorUDT())
d = df_filtered.select("id",par("vect"))
from pyspark.ml.linalg import VectorUDT as VectorUDTML
as_ml = udf(lambda v: v.asML() if v is not None else None, VectorUDTML())
result = d.withColumn("<lambda>(vect)", as_ml("<lambda>(vect)"))
mh = MinHashLSH(inputCol="<lambda>(vect)", outputCol="hashes", seed=12345, numHashTables=15)
model = mh.fit(df)
a = model.transform(df)

jd = model.approxSimilarityJoin(a, a,1.0  , distCol="JaccardDistance").select(
     col("datasetA.id1").alias("idA"),
     col("datasetB.id1").alias("idB"),
     col("JaccardDistance"))

df 有两列,id并且sparse_vector. idcolumn 是一个字母数字 id 并且sparse_vectorcolumns 包含这样的记录SparseVector(243775, {0: 1.0, 1: 1.0, 2: 1.0, 3: 1.0, 4: 1.0, 6: 1.0, 7: 1.0, 8: 1.0, 9: 1.0, 10: 1.0, 11: 1.0, 12: 1.0, 13: 1.0, 14: 1.0, 15: 1.0, 16: 1.0, 24: 1.0, 30: 1.0, 31: 1.0, 32: 1.0, 61: 1.0, 88: 1.0, 90: 1.0, 96: 1.0, 104: 1.0, 153: 1.0, 155: 1.0, 159: 1.0, 160: 1.0, 161: 1.0, 162: 1.0, 169: 1.0, 181: 1.0, 194: 1.0, 212: 1.0, 220: 1.0, 222: 1.0, 232: 1.0, 303: 1.0, 390: 1.0, 427: 1.0, 506: 1.0, 508: 1.0, 509: 1.0, 518: 1.0, 554: 1.0, 568: 1.0, 798: 1.0, 1431: 1.0, 2103: 1.0, 2139: 1.0, 3406: 1.0, 3411: 1.0, 3415: 1.0, 3429: 1.0, 3431: 1.0, 3440: 1.0, 3443: 1.0, 3449: 1.0}))

当我计算 Jaccard 并写下数据时,我错过了很多 id 对。数据中共有 45k 个身份,因此输出应包含大约 45k*45k 对。

此外,当我将 1k ids 与 45k ids 进行比较并以这种方式进行所有 ids 时,我得到了所有可能的对,有点像批次。任何输入都会有所帮助。另外,我可以并行化代码以便我更快地拥有批处理系统吗?我在 emr 集群上运行代码并且有资源来增加集群大小。

以下脚本可用于生成带有 id 的样本数据和人工生成的稀疏向量。

from random import randint
from collections import OrderedDict
with open('/mnt/lsh_data.csv', 'a') as the_file:
    the_file.write("id\vect\n")
    for i in range(45000):
        a = "id"
        b = a + str(i)
        num_ent = randint(101, 195) + randint(102, 200)
        lis = []
        for j in range(num_ent):
            lis.append(randint(0, 599999))
        lis.sort()
        l = list(OrderedDict.fromkeys(lis))
        data = []
        for j in range(len(l)):
            c = randint(0,1)
            if c == 0:
                data.append(1.0)
            else:
                data.append(0.0)
        b = b + "\t(600000,"+str(l)+","+str(data)+")\n"
        the_file.write(b)
4

2 回答 2

0

不是真正的答案,但评论太长了:

我不确定它是如何approxSimilarityJoin工作的以及预期的输出是什么。但是我检查了文档中给出的示例(http://spark.apache.org/docs/2.2.0/api/python/pyspark.ml.html?highlight=minhash%20lsh#pyspark.ml.feature.MinHashLSH)这只有 3 x 3,即使在那里我们也没有得到完整的叉积(即使我们增加了阈值)。所以也许这不是预期的输出......

from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col
from pyspark.ml.feature import MinHashLSH

data = [(0, Vectors.sparse(6, [0, 1, 2], [1.0, 1.0, 1.0]),),
        (1, Vectors.sparse(6, [2, 3, 4], [1.0, 1.0, 1.0]),),
        (2, Vectors.sparse(6, [0, 2, 4], [1.0, 1.0, 1.0]),)]

df = spark.createDataFrame(data, ["id", "features"])

mh = MinHashLSH(inputCol="features", outputCol="hashes", seed=12345)

model = mh.fit(df)
model.transform(df).head()

data2 = [(3, Vectors.sparse(6, [1, 3, 5], [1.0, 1.0, 1.0]),),
         (4, Vectors.sparse(6, [2, 3, 5], [1.0, 1.0, 1.0]),),
         (5, Vectors.sparse(6, [1, 2, 4], [1.0, 1.0, 1.0]),)]

df2 = spark.createDataFrame(data2, ["id", "features"])

model.approxSimilarityJoin(df, df2, 1.0, distCol="JaccardDistance").show()

于 2021-01-16T13:42:03.100 回答
0

检查approxSimilarityJoin源代码,您可以看到该函数首先对每个输入向量的局部敏感散列(LSH) 执行连接,该连接“以高概率将相似的输入项散列到相同的桶中”。然后它计算结果的距离。其效果是,仅计算在获取每个向量的 LSH 后最终位于同一桶中的向量之间的距离。这就是为什么您看不到输入数据集中所有对的距离,只看到最终位于同一存储桶中的向量对的原因。

此外,LSH 的输入是来自数据的输入向量和从初始种子派生的随机系数,这解释了为什么更改种子会改变分桶并因此改变输出。

如果您通过更改参数值进行实验,您可以看到分桶变化。MinHashLSH seed

于 2021-01-25T12:39:28.940 回答