我正在尝试以 SparseVectors 的形式计算某些 id 及其属性之间的 Jaccard 距离。
from pyspark.ml.feature import MinHashLSH
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col
from pyspark.sql.functions import monotonically_increasing_id
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
import pyspark.sql.functions as F
from pyspark.mllib.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf
sqlContext = SQLContext(sc)
df = sqlContext.read.load("path")
par = udf(lambda s: Vectors.parse(s), VectorUDT())
d = df_filtered.select("id",par("vect"))
from pyspark.ml.linalg import VectorUDT as VectorUDTML
as_ml = udf(lambda v: v.asML() if v is not None else None, VectorUDTML())
result = d.withColumn("<lambda>(vect)", as_ml("<lambda>(vect)"))
mh = MinHashLSH(inputCol="<lambda>(vect)", outputCol="hashes", seed=12345, numHashTables=15)
model = mh.fit(df)
a = model.transform(df)
jd = model.approxSimilarityJoin(a, a,1.0 , distCol="JaccardDistance").select(
col("datasetA.id1").alias("idA"),
col("datasetB.id1").alias("idB"),
col("JaccardDistance"))
df 有两列,id
并且sparse_vector
. id
column 是一个字母数字 id 并且sparse_vector
columns 包含这样的记录SparseVector(243775, {0: 1.0, 1: 1.0, 2: 1.0, 3: 1.0, 4: 1.0, 6: 1.0, 7: 1.0, 8: 1.0, 9: 1.0, 10: 1.0, 11: 1.0, 12: 1.0, 13: 1.0, 14: 1.0, 15: 1.0, 16: 1.0, 24: 1.0, 30: 1.0, 31: 1.0, 32: 1.0, 61: 1.0, 88: 1.0, 90: 1.0, 96: 1.0, 104: 1.0, 153: 1.0, 155: 1.0, 159: 1.0, 160: 1.0, 161: 1.0, 162: 1.0, 169: 1.0, 181: 1.0, 194: 1.0, 212: 1.0, 220: 1.0, 222: 1.0, 232: 1.0, 303: 1.0, 390: 1.0, 427: 1.0, 506: 1.0, 508: 1.0, 509: 1.0, 518: 1.0, 554: 1.0, 568: 1.0, 798: 1.0, 1431: 1.0, 2103: 1.0, 2139: 1.0, 3406: 1.0, 3411: 1.0, 3415: 1.0, 3429: 1.0, 3431: 1.0, 3440: 1.0, 3443: 1.0, 3449: 1.0}))
当我计算 Jaccard 并写下数据时,我错过了很多 id 对。数据中共有 45k 个身份,因此输出应包含大约 45k*45k 对。
此外,当我将 1k ids 与 45k ids 进行比较并以这种方式进行所有 ids 时,我得到了所有可能的对,有点像批次。任何输入都会有所帮助。另外,我可以并行化代码以便我更快地拥有批处理系统吗?我在 emr 集群上运行代码并且有资源来增加集群大小。
以下脚本可用于生成带有 id 的样本数据和人工生成的稀疏向量。
from random import randint
from collections import OrderedDict
with open('/mnt/lsh_data.csv', 'a') as the_file:
the_file.write("id\vect\n")
for i in range(45000):
a = "id"
b = a + str(i)
num_ent = randint(101, 195) + randint(102, 200)
lis = []
for j in range(num_ent):
lis.append(randint(0, 599999))
lis.sort()
l = list(OrderedDict.fromkeys(lis))
data = []
for j in range(len(l)):
c = randint(0,1)
if c == 0:
data.append(1.0)
else:
data.append(0.0)
b = b + "\t(600000,"+str(l)+","+str(data)+")\n"
the_file.write(b)