2

我想filter删除字段“状态”不等于“确定”的 RDD 元素。我从 HDFS 上的一组 CSV 文件创建我的 RDD,然后map在尝试之前获取我想要的结构filter

import csv, StringIO    

files = "/hdfs_path/*.csv"

fields = ["time", "status"]

dial = "excel"

default = {'status': 'OK', 'time': '2014-01-01  00:00:00'}

def loadRecord(line, fieldnames, dialect):
    input = StringIO.StringIO(line)
    reader = csv.DictReader(input, fieldnames = fieldnames, dialect = dialect)
    try:
        line = reader.next()
        if line is None:
            return default
        else:
            return line
    except:
        return default

harmonics = sc.textFile(files) \
              .map(lambda x: loadRecord(x, fields, dial)) \
              .filter(lambda x: "OK" not in x['status'])

我可以对这个 RDD 做其他事情——例如mapget某些字段做其他事情,等等。但是,当我使用 运行我的代码时,其中一个任务总是失败,并在我的lambda 函数filter中出现异常:filter

'NoneType object is not iterable'

我认为这意味着filterlambda 正在接收None,所以我添加了代码loadRecord以避免返回None。但是,我仍然遇到同样的错误。它确实适用于一个小样本数据集,但我的实际数据足够大,我不确定如何检测它的哪些部分可能导致问题。

任何输入表示赞赏!

4

2 回答 2

3

First, relpace map(lambda x: loadRecord(x, fields, dial)) with map(lambda x: (x, loadRecord(x, fields, dial))) - this way you save both original record and the parsed one.

Second, replace filter() call with flatMap(test_function) and define the test_function the way it tests the input and if the second passed parameter is None (parsed record) it whould return the first one.

This way you would get the input lines causing your problem and would test your script on them locally. And in general I would add a line global default as a first line of your loadRecord function

于 2015-03-12T11:03:56.023 回答
0

使用0x0FFF的答案作为基础,我能够让我的代码运行。我仍然没有看到违规文件的违规行,但我比以前更接近了。这是我所做的,从我的问题中的代码开始:

def checkNone(x):
    try:
        return "OK" not in x['status']
    except:
        return True

harmonics = sc.textFile(files) \
              .map(lambda x: loadRecord(x, fields, dial)) \
              .filter(lambda x: checkNone(x))
于 2015-03-12T20:04:31.527 回答