10

假设我有一排电话记录格式:

[CallingUser, ReceivingUser, Duration]

如果我想知道给定用户在电话上的总时间(用户是呼叫用户或接收用户的持续时间总和)。

实际上,对于给定的记录,我想创建 2 对(CallingUser, Duration)(ReceivingUser, Duration).

最有效的方法是什么?我可以将 2 加RDDs在一起,但我不清楚这是否是一个好方法:

#Sample Data:
callData = sc.parallelize([["User1", "User2", 2], ["User1", "User3", 4], ["User2", "User1", 8]  ])


calls = callData.map(lambda record: (record[0], record[2]))

#The potentially inefficient map in question:
calls += callData.map(lambda record: (record[1], record[2]))


reduce = calls.reduceByKey(lambda a, b: a + b)
4

2 回答 2

11

你想要平面地图。如果您编写一个返回列表的函数,[(record[0], record[2]),(record[1],record[2])]那么您可以对其进行平面映射!

于 2015-02-27T07:15:46.557 回答
8

使用 flatMap() 可以很好地获取单个输入并生成多个映射输出。完成代码:

callData = sc.parallelize([["User1", "User2", 2], ["User1", "User3", 4], ["User2", "User1", 8]])

calls = callData.flatMap(lambda record: [(record[0], record[2]), (record[1], record[2])])
print calls.collect()
# prints [('User1', 2), ('User2', 2), ('User1', 4), ('User3', 4), ('User2', 8), ('User1', 8)]

reduce = calls.reduceByKey(lambda a, b: a + b)
print reduce.collect()
# prints [('User2', 10), ('User3', 4), ('User1', 14)]
于 2015-06-09T04:44:41.437 回答