4

在我的 PySpark 应用程序中,我有两个 RDD:

  • items - 这包含所有有效项目的项目 ID 和项目名称。大约 100000 项。

  • 属性表 - 这包含字段用户 ID、项目 ID 和此组合的属性值。这些是系统中每个用户-项目组合的特定属性。这个 RDD 有几百行,每行 1000 行。

我想丢弃属性表 RDD 中与项目 RDD 中的有效项目 ID(或名称)不对应的所有行。换句话说,通过项目 ID 进行半连接。例如,如果这些是 R 数据帧,我会做semi_join(attributeTable, items, by="itemID")

我首先尝试了以下方法,但发现这需要永远返回(在我的 PC 上的 VM 上运行的本地 Spark 安装上)。可以理解的是,因为涉及的比较数量如此之多:

# Create a broadcast variable of all valid item IDs for doing filter in the drivers
validItemIDs = sc.broadcast(items.map(lambda (itemID, itemName): itemID)).collect())
attributeTable = attributeTable.filter(lambda (userID, itemID, attributes): itemID in set(validItemIDs.value))

经过一番摆弄,我发现以下方法工作得非常快(在我的系统上一分钟左右)。

# Create a broadcast variable for item ID to item name mapping (dictionary) 
itemIdToNameMap = sc.broadcast(items.collectAsMap())

# From the attribute table, remove records that don't correspond to a valid item name.
# First go over all records in the table and add a dummy field indicating whether the item name is valid
# Then, filter out all rows with invalid names. Finally, remove the dummy field we added.
attributeTable = (attributeTable
                  .map(lambda (userID, itemID, attributes): (userID, itemID, attributes, itemIdToNameMap.value.get(itemID, 'Invalid')))
                  .filter(lambda (userID, itemID, attributes, itemName): itemName != 'Invalid')
                  .map(lambda (userID, itemID, attributes, itemName): (userID, itemID, attributes)))

尽管这对我的应用程序来说效果很好,但感觉更像是一种肮脏的解决方法,我很确定在 Spark 中必须有另一种更清洁或惯用正确(并且可能更有效)的方式来做到这一点。你有什么建议?我是 Python 和 Spark 的新手,所以如果您能指出正确的资源,任何 RTFM 建议也会有所帮助。

我的 Spark 版本是 1.3.1。

4

2 回答 2

2

只需进行常规连接,然后丢弃“查找”关系(在您的情况下为itemsrdd)。

如果这些是您的 RDD (来自另一个答案的示例)

items = sc.parallelize([(123, "Item A"), (456, "Item B")])
attributeTable = sc.parallelize([(123456, 123, "Attribute for A")])

那么你会这样做:

attributeTable.keyBy(lambda x: x[1])
  .join(items)
  .map(lambda (key, (attribute, item)): attribute)

结果,您只有来自 RDD 的元组,attributeTable它们在 RDD 中有相应的条目items

[(123456, 123, 'Attribute for A')]

按照另一个答案中leftOuterJoin的建议进行操作也可以完成这项工作,但效率较低。此外,另一个答案半连接itemswithattributeTable而不是attributeTablewith items

于 2017-04-12T16:34:06.907 回答
0

正如其他人所指出的,这可能通过利用 DataFrame 最容易实现。leftOuterJoin但是,您也许可以通过使用和filter函数来实现您的预​​期目标。像下面这样有点骇人听闻的东西可能就足够了:

items = sc.parallelize([(123, "Item A"), (456, "Item B")])
attributeTable = sc.parallelize([(123456, 123, "Attribute for A")])
sorted(items.leftOuterJoin(attributeTable.keyBy(lambda x: x[1]))
       .filter(lambda x: x[1][1] is not None)
       .map(lambda x: (x[0], x[1][0])).collect())

返回

[(123, 'Item A')]
于 2015-06-30T17:50:16.397 回答