我在 MacBook(i5、2.6GHz、8GB 内存)上使用 Zeppelin NB 和 Spark 在独立模式下进行了一些实验。spark.executor/driver.memory 都得到 2g。我也在spark.serializer org.apache.spark.serializer.KryoSerializer
spark-defaults.conf 中设置了,但这似乎被 zeppelin 忽略了
ALS模型
我已经训练了一个具有约 400k(隐式)评级的 ALS 模型,并希望获得建议val allRecommendations = model.recommendProductsForUsers(1)
样本集
接下来我拿一个样本来玩
val sampledRecommendations = allRecommendations.sample(false, 0.05, 1234567).cache
这包含 3600 条建议。
移除用户拥有的产品推荐
接下来我想删除给定用户已经拥有的产品的所有评级,我在 RDD 中保存的列表 (user_id, Set[product_ids]):RDD[(Long, scala.collection.mutable.HashSet[Int])]
val productRecommendations = (sampledRecommendations
// add user portfolio to the list, but convert the key from Long to Int first
.join(usersProductsFlat.map( up => (up._1.toInt, up._2) ))
.mapValues(
// (user, (ratings: Array[Rating], usersOwnedProducts: HashSet[Long]))
r => (r._1
.filter( rating => !r._2.contains(rating.product))
.filter( rating => rating.rating > 0.5)
.toList
)
)
// In case there is no recommendation (left), remove the entry
.filter(rating => !rating._2.isEmpty)
).cache
问题 1在缓存样本集上
调用此 ( productRecommendations.count
) 会生成一个包含flatMap at MatrixFactorizationModel.scala:278
10,000个任务、263.6 MB 输入数据和 196.0 MB 随机写入的阶段。不应该使用微小的和缓存的RDD吗?这里发生了什么(wr)on(g)?计数的执行需要将近 5 分钟!
问题 2usersProductsFlat.count
根据应用程序 UI 中的“存储”视图完全缓存的
调用每次大约需要 60 秒。它的大小为 23Mb – 不应该快很多吗?
映射为可读形式
接下来,我以某种可读的形式将 ID 替换为广播查找 Map 中的名称,以放入 DF/表中:
val readableRatings = (productRecommendations
.flatMapValues(x=>x)
.map( r => (r._1, userIdToMailBC.value(r._1), r._2.product.toInt, productIdToNameBC.value(r._2.product), r._2.rating))
).cache
val readableRatingsDF = readableRatings.toDF("user","email", "product_id", "product", "rating").cache
readableRatingsDF.registerTempTable("recommendations")
选择……耐心
疯狂的部分从这里开始。做一个 SELECT 需要几个小时(我永远不能等待一个完成):
%sql
SELECT COUNT(user) AS usr_cnt, product, AVG(rating) AS avg_rating
FROM recommendations
GROUP BY product
我不知道在哪里可以找到这里的瓶颈,这里显然发生了一些巨大的混乱!我可以从哪里开始寻找?