如何用rdd写入全局列表?
Li = []
Fn(list):
If list.value == 4:
Li.append(1)
rdd.mapValues(lambda x:fn(x))
当我尝试打印 Li 结果是:[]
我要做的是在转换 rdd 对象的同时转换另一个全局列表 Li1 。但是,当我这样做时,最后总是有一个空列表。Li1 永远不会被转换。
如何用rdd写入全局列表?
Li = []
Fn(list):
If list.value == 4:
Li.append(1)
rdd.mapValues(lambda x:fn(x))
当我尝试打印 Li 结果是:[]
我要做的是在转换 rdd 对象的同时转换另一个全局列表 Li1 。但是,当我这样做时,最后总是有一个空列表。Li1 永远不会被转换。
执行 s - 后将Li
值设置为的原因是因为 Spark 序列化函数(以及它引用的所有全局变量 - 它称为闭包)并发送到另一台机器 - 工作者。[]
mapValue
Fn
但是没有完全对应的机制可以将带有闭包的结果从工作人员发送回驱动程序。
为了接收结果 - 您需要从函数返回并使用类似take()
or的操作collect()
。但要小心——您不想发回的数据超出驱动程序的内存容量——否则 Spark 应用程序将抛出内存不足异常。
此外,您还没有对 RDDmapValues
转换执行操作 - 因此在您的示例中,没有对工作人员执行任何任务。
rdd = sc.parallelize([(x, x+1) for x in range(2, 5)])
def Fn(value):
return value*2
Li = rdd.mapValues(lambda x:Fn(x)).collect()
print Li
会导致
[(2, 6), (3, 8), (4, 10)]
按照您的问题描述(基于我对您想要做什么的理解):
L1 = range(20)
rdd = sc.parallelize(L1)
L2 = rdd.filter(lambda x: x % 2==0).collect()
print L2
>>> [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]