我正在使用 pyspark 分析数据集,我有点惊讶为什么即使我使用的是未广播的变量,以下代码也能正常工作。
有问题的变量是video
,在函数中使用filter
,在连接之后。
seed = random.randint(0,999)
# df is a dataframe
# video is just one randomly sampled element
video = df.sample(False,0.001,seed).head()
# just a python list
otherVideos = [ (22,0.32),(213,0.43) ]
# transform the python list into an rdd
resultsRdd = sc.parallelize(similarVideos)
rdd = df.rdd.map(lambda row: (row.video_id,row.title))
# perform a join between resultsRdd and rdd
# note that video.title was NOT broadcast
(resultsRdd
.join(rdd)
.filter(lambda pair: pair[1][1] != video.title) # HERE!!!
.takeOrdered(10, key= lambda pair: -pair[1][0]))
我在独立模式下使用 pyspark,pyspark-submit 有以下参数:
--num-executors 12 --executor-cores 4 --executor-memory 1g --master local[*]
另外,我在jupyter(新的 ipython-notebooks)上运行之前的代码。