在我的 PySpark 应用程序中,我有两个 RDD:
items - 这包含所有有效项目的项目 ID 和项目名称。大约 100000 项。
属性表 - 这包含字段用户 ID、项目 ID 和此组合的属性值。这些是系统中每个用户-项目组合的特定属性。这个 RDD 有几百行,每行 1000 行。
我想丢弃属性表 RDD 中与项目 RDD 中的有效项目 ID(或名称)不对应的所有行。换句话说,通过项目 ID 进行半连接。例如,如果这些是 R 数据帧,我会做semi_join(attributeTable, items, by="itemID")
我首先尝试了以下方法,但发现这需要永远返回(在我的 PC 上的 VM 上运行的本地 Spark 安装上)。可以理解的是,因为涉及的比较数量如此之多:
# Create a broadcast variable of all valid item IDs for doing filter in the drivers
validItemIDs = sc.broadcast(items.map(lambda (itemID, itemName): itemID)).collect())
attributeTable = attributeTable.filter(lambda (userID, itemID, attributes): itemID in set(validItemIDs.value))
经过一番摆弄,我发现以下方法工作得非常快(在我的系统上一分钟左右)。
# Create a broadcast variable for item ID to item name mapping (dictionary)
itemIdToNameMap = sc.broadcast(items.collectAsMap())
# From the attribute table, remove records that don't correspond to a valid item name.
# First go over all records in the table and add a dummy field indicating whether the item name is valid
# Then, filter out all rows with invalid names. Finally, remove the dummy field we added.
attributeTable = (attributeTable
.map(lambda (userID, itemID, attributes): (userID, itemID, attributes, itemIdToNameMap.value.get(itemID, 'Invalid')))
.filter(lambda (userID, itemID, attributes, itemName): itemName != 'Invalid')
.map(lambda (userID, itemID, attributes, itemName): (userID, itemID, attributes)))
尽管这对我的应用程序来说效果很好,但感觉更像是一种肮脏的解决方法,我很确定在 Spark 中必须有另一种更清洁或惯用正确(并且可能更有效)的方式来做到这一点。你有什么建议?我是 Python 和 Spark 的新手,所以如果您能指出正确的资源,任何 RTFM 建议也会有所帮助。
我的 Spark 版本是 1.3.1。