-1

如何用rdd写入全局列表?

 Li = []

 Fn(list):        
    If list.value == 4: 
        Li.append(1)
 rdd.mapValues(lambda x:fn(x))

当我尝试打印 Li 结果是:[]

我要做的是在转换 rdd 对象的同时转换另一个全局列表 Li1 。但是,当我这样做时,最后总是有一个空列表。Li1 永远不会被转换。 

4

1 回答 1

1

执行 s - 后将Li值设置为的原因是因为 Spark 序列化函数(以及它引用的所有全局变量 - 它称为闭包)并发送到另一台机器 - 工作者。[]mapValueFn

但是没有完全对应的机制可以将带有闭包的结果从工作人员发送回驱动程序。

为了接收结果 - 您需要从函数返回并使用类似take()or的操作collect()。但要小心——您不想发回的数据超出驱动程序的内存容量——否则 Spark 应用程序将抛出内存不足异常。

此外,您还没有对 RDDmapValues转换执行操作 - 因此在您的示例中,没有对工作人员执行任何任务。

rdd = sc.parallelize([(x, x+1) for x in range(2, 5)])

def Fn(value):
    return value*2

Li = rdd.mapValues(lambda x:Fn(x)).collect()

print Li

会导致

[(2, 6), (3, 8), (4, 10)]

埃迪

按照您的问题描述(基于我对您想要做什么的理解):

L1 = range(20)
rdd = sc.parallelize(L1)

L2 = rdd.filter(lambda x: x % 2==0).collect()

print L2
>>> [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
于 2015-06-16T01:09:17.440 回答