7

我有一个映射器方法:

def mapper(value):
    ...
    for key, value in some_list:
        yield key, value

实际上,我需要的与普通的字数示例相差不远。我已经有了工作脚本,但前提是映射器方法如下所示:

def mapper(value):
    ...
    return key, value

这是它的调用的样子:

sc.textFile(sys.argv[2], 1).map(mapper).reduceByKey(reducer).collect()

我花了 2 个小时尝试编写支持 mapper 中的生成器的代码。但不能那样做。我什至同意只返回一个列表:

def mapper(value):
    ...
    result_list = []
    for key, value in some_list:
        result_list.append( key, value )
    return result_list

在这里:https : //groups.google.com/forum/#!searchin/spark-users/flatmap $20multiple/spark-users/1WqVhRBaJsU/-D5QRbenlUgJ 我发现我应该使用flatMap,但它没有成功- 然后我的减速器开始获取输入,例如 (key1, value1, key2, value2, value3, ...) - 但它应该是 [(key1, value1), (key2, value2, value3)...]。换句话说,reducer 开始只取单件,不知道它是一个值还是一个键,如果是值 - 它属于哪个键。

那么如何使用返回迭代器或列表的映射器呢?

谢谢!

4

1 回答 1

11

flatMap如果你想要一个返回多个输出的 map 函数,你可以使用。

传递给的函数flatMap可以返回一个可迭代的:

>>> words = sc.textFile("README.md")
>>> def mapper(line):
...     return ((word, 1) for word in line.split())
...
>>> words.flatMap(mapper).take(4)
[(u'#', 1), (u'Apache', 1), (u'Spark', 1), (u'Lightning-Fast', 1)]
>>> counts = words.flatMap(mapper).reduceByKey(lambda x, y: x + y)
>>> counts.take(5)
[(u'all', 1), (u'help', 1), (u'webpage', 1), (u'when', 1), (u'Hadoop', 12)]

它也可以是生成器函数:

>>> words = sc.textFile("README.md")
>>> def mapper(line):
...     for word in line.split():
...         yield (word, 1)
...
>>> words.flatMap(mapper).take(4)
[(u'#', 1), (u'Apache', 1), (u'Spark', 1), (u'Lightning-Fast', 1)]
>>> counts = words.flatMap(mapper).reduceByKey(lambda x, y: x + y)
>>> counts.take(5)
[(u'all', 1), (u'help', 1), (u'webpage', 1), (u'when', 1), (u'Hadoop', 12)]

您提到您尝试过flatMap,但它将所有内容扁平化为一个列表,[key, value, key, value, ...]而不是[(key, value), (key, value)...]键值对列表。我怀疑这是您的地图功能中的问题。如果您仍然遇到此问题,您能否发布更完整的地图功能版本?

于 2014-01-13T18:20:03.110 回答