我有一个大约 50k 不同行和 2 列的表。您可以将每一行视为一部电影,将列视为该电影的属性 - “ID”:该电影的 id,“Tags”:电影的一些内容标签,以每部电影的字符串列表的形式。
数据看起来像这样:
movie_1, ['浪漫','喜剧','英语']; movie_2, ['动作','功夫','中国']
我的目标是首先根据它们对应的标签计算每部电影之间的提花相似度,一旦完成,我将能够知道每部电影(例如我选择movie_1),其他最相似的前5部电影是什么用这个(在这种情况下是movie_1)。而且我希望获得前 5 名的结果不仅是 movie_1 本身,还希望获得所有电影的前 5 名。
我尝试使用 Python 来解决这个问题,但是运行时间在这里是一个很大的挑战。即使我使用多处理,在 6 核上运行,总运行时间仍然持续了 20 多个小时。
Python代码如下:
import pandas as pd
from collections import Counter
import numpy as np
from multiprocessing import Pool
import time
col_names=['movie_id','tag_name']
df=pd.read_csv("movies.csv",names=col_names)
movie_ids=df['movie_id'].tolist()
tag_list=df['tag_name'].tolist()
def jaccard_similarity(string1, string2):
intersection = set(string1).intersection(set(string2))
union = set(string1).union(set(string2))
return len(intersection)/float(len(union))
def jc_results(movie_id):
result=Counter()
this_index=movie_ids.index(movie_id)
for another_id in movie_ids:
that_index=movie_ids.index(another_id)
if another_id==movie_id:
continue
else:
tag_1=tag_list[this_index]
tag_2=tag_list[that_index]
jaccard = jaccard_similarity(tag_1,tag_2)
result[(movie_id,another_id)]=jaccard
return result.most_common(10)
from multiprocessing import Pool
pool=Pool(6)
results={}
for movie_id in movie_ids:
results[movie_id]=pool.apply_async(jc_results,args=(movie_id,))
pool.close()
pool.join()
for movie_id, res in results.items():
results[movie_id] = res.get()
然后我想切换到Pyspark,但是我对spark python还是很陌生,用它写了几行后就卡住了,实际上我唯一取得的进展是使用sc.textFile将数据读入RDD ...已经阅读了现有的帖子,但他们都在使用 Scala。如果有人可以帮助或提供有关 Pyspark 的任何指导,那就太好了。非常感谢!