从这个例子开始,我在 Pyspark 上使用了局部敏感散列 (LSH) 来查找重复的文档。
关于我的数据库的一些注释:我有 4M 文本文件。每个文件平均有 20K 个字符。目前,我只考虑每个文档的前 500 个字符。
当我将字符数从 500 增加到 1000 时,会出现内存错误。
我已经尝试过处理管道的参数。我知道我可以避免内存错误增加 Ngram 中的 n 和减少 MinHashLSH 中的 NumHashTables。但是,这会过多地增加假阴性。
管道中是否还有其他可以提高性能的步骤?
我的目标是将字符数从 500 增加到 2000,而不会出现内存错误或非常长的计算时间(理想情况下,时间计算 < 6h)。
这是我的假数据代码:
# Prameters
# NGram
n_gram = 2 #really, i use n_gram=8 because i have 500char per each document
# MinHashLSH
hash_tables = 10 #really, i use hash_tables=3 to avoid memory error and too long computational time
# jaccard treshold
treshold_test = 0.5
#Fake dataframe
df = spark.createDataFrame([
(0, "Hi I heard about Spark Hi I heard about Spark Hi I heard about Spark Hi I heard about Spark"),
(1, "I wish Java could use case classes I wish Java could use case classes!!"),
(2, "Logistic, regression, models, are, neat, etc, etc, etc, etc, etc, etc, etc, etc"),
(3, "Hi I heard about Spork Hi I heard about Spark Hi I heard about Spark Hi I heard about Spark"),
(4, "Hi I heard about Java Hi I heard about Java Hi I heard about Java Hi I heard about Java")
], ["id", "text"])
# cleaning puntuactions and double spaces
df = df.withColumn("text", regexp_replace('text', r'\p{Punct}', ''))
df = df.withColumn("text", regexp_replace('text', r' (?= |$)', ''))
#trim whitespaces and filtering out text too short
df = df.withColumn("text", trim(col("text")))\
.filter((col('text') != "") & (length(col('text')) > n_gram*3))
df.show(5,False)
# LSH pipeline
from pyspark.ml import Pipeline
from pyspark.ml.feature import RegexTokenizer, NGram, HashingTF, MinHashLSH
db = df
query = df
model = Pipeline(stages=[
RegexTokenizer(
pattern="", inputCol="text", outputCol="tokens", minTokenLength=1
),
NGram(n=n_gram, inputCol="tokens", outputCol="ngrams"),
HashingTF(inputCol="ngrams", outputCol="vectors"),
MinHashLSH(inputCol="vectors", outputCol="lsh", numHashTables=hash_tables)]).fit(db)
db_hashed = model.transform(db)
query_hashed = model.transform(query)
output = model.stages[-1].approxSimilarityJoin(db_hashed, query_hashed, treshold_test)
# similar pairs of documents:
output.filter(col('datasetA.id') != col('datasetB.id'))\
.select(col("datasetA.id").alias("idA"),
col("datasetB.id").alias("idB"),
col("datasetA.text").alias("textA"),
col("datasetB.text").alias("textB"),
col("distCol")).sort(col("distCol"))\
.withColumn('comb', sort_array(array(*('idA', 'idB')))).dropDuplicates(['comb']).show()