0

我正在使用 pyspark 分析数据集,我有点惊讶为什么即使我使用的是广播的变量,以下代码也能正常工作。

有问题的变量是video,在函数中使用filter,在连接之后。

seed = random.randint(0,999)

# df is a dataframe
# video is just one randomly sampled element
video = df.sample(False,0.001,seed).head()

# just a python list
otherVideos = [ (22,0.32),(213,0.43) ]

# transform the python list into an rdd 
resultsRdd = sc.parallelize(similarVideos)

rdd = df.rdd.map(lambda row: (row.video_id,row.title))

# perform a join between resultsRdd and rdd
# note that video.title was NOT broadcast
(resultsRdd
   .join(rdd)
   .filter(lambda pair: pair[1][1] != video.title) # HERE!!!
   .takeOrdered(10, key= lambda pair: -pair[1][0]))

我在独立模式下使用 pyspark,pyspark-submit 有以下参数:

--num-executors 12 --executor-cores 4 --executor-memory 1g --master local[*]

另外,我在jupyter(新的 ipython-notebooks)上运行之前的代码。

4

1 回答 1

2

[重新发表评论作为答案。]

对于这个概念,我认为这个关于理解闭包的链接是一本很好的读物。本质上,您不需要广播 RDD 范围之外的所有变量,因为闭包(在您的情况下video)将被序列化并发送到每个执行程序和任务以在任务执行期间访问。当广播的数据集很大时,广播变量很有用,因为它将作为只读缓存存在,该缓存将位于执行程序上,并且不会随着在该执行程序上运行的每个任务进行序列化/发送/反序列化。

于 2015-10-26T14:28:11.957 回答