0

运行我编写的以下 SPARK 代码时出现错误。我正在尝试根据键找到所有向量的总和。每个输入行以键(整数)开头,然后是 127 个浮点数,这是一个具有 127 维的单个向量,即每行以一个键和一个向量开头。


from cStringIO import StringIO

class testing:
    def __str__(self):
        file_str = StringIO()
        for n in self.vector:
            file_str.write(str(n)) 
            file_str.write(" ")
        return file_str.getvalue()
    def __init__(self,txt="",initial=False):
        self.vector = [0.0]*128
        if len(txt)==0:
            return
        i=0
        for n in txt.split():
            if i<128:
                self.vector[i]=float(n)
                i = i+1
                continue
            self.filename=n
            break
def addVec(self,r):
    a = testing()
    for n in xrange(0,128):
        a.vector[n] = self.vector[n] + r.vector[n]
    return a

def InitializeAndReturnPair(string,first=False):
    vec = testing(string,first)
    return 1,vec


from pyspark import SparkConf, SparkContext
conf = (SparkConf()
         .setMaster("local")
         .setAppName("My app")
         .set("spark.executor.memory", "1g"))
sc = SparkContext(conf = conf)

inp = sc.textFile("input.txt")
output = inp.map(lambda s: InitializeAndReturnPair(s,True)).cache()
output.saveAsTextFile("output")
print output.reduceByKey(lambda a,b : a).collect()

示例行input.txt

6.0 156.0 26.0 3.0 1.0 0.0 2.0 1.0 15.0 113.0 53.0 139.0 156.0 0.0 0.0 0.0 156.0 29.0 1.0 38.0 59.0 0.0 0.0 0.0 28.0 4.0 2.0 9.0 1.0 0.0 0.0 0.0 9.0 83.0 13.0 1.0 0.0 9.0 42.0 7.0 41.0 71.0 74.0 123.0 35.0 17.0 7.0 2.0 156.0 27.0 6.0 33.0 11.0 2.0 0.0 11.0 35.0 4.0 2.0 4.0 1.0 3.0 2.0 4.0 0.0 0.0 0.0 0.0 2.0 19.0 45.0 17.0 47.0 2.0 2.0 7.0 59.0 90.0 15.0 11.0 156.0 14.0 1.0 4.0 9.0 11.0 2.0 29.0 35.0 6.0 5.0 9.0 4.0 2.0 1.0 3.0 1.0 0.0 0.0 0.0 1.0 5.0 25.0 14.0 27.0 2.0 0.0 2.0 86.0 48.0 10.0 6.0 156.0 23.0 1.0 2.0 21.0 6.0 0.0 3.0 31.0 10.0 4.0 3.0 0.0 0.0 1.0 2.0

下面是我得到的错误。此错误来自代码的最后一行,即output.reduceByKey

错误消息 - http://pastebin.com/0tqiiJQm

不太确定如何解决这个问题。我尝试使用 aMarshalSerializer但它给出了同样的问题。

- - - - - - - - - - - - - - - 回答 - - - - - - - - - - -----------------

我从apache 用户列表中得到了相同问题的答案。基本上,在集群中运行的映射器/归约器没有类定义,我们必须通过在不同的模块中编写类并在使用配置 SparkContext 时附加来传递类

sc.addPyFile(os.path( HOMEDirectory + "module.py"))

谢谢大家帮助我。

4

1 回答 1

0

您可以使用与 spark 配合良好的 numpy 数组。

import numpy as np

def row_map(text):
    split_text = text.split()
    # create numpy array from elements besides the first element 
    # which is the key
    return split_text(0), np.array([float(v) for v in split_text[1:]])

from pyspark import SparkConf, SparkContext
conf = (SparkConf()
     .setMaster("local")
     .setAppName("My app")
     .set("spark.executor.memory", "1g"))
sc = SparkContext(conf = conf)

inp = sc.textFile("input.txt")
output = inp.map(row_map).cache()
#Below line is throwing error
print output.reduceByKey(lambda a,b : np.add(a,b)).collect()

更简洁和pythonic。

于 2014-10-26T21:43:23.330 回答