2

我在 MacBook(i5、2.6GHz、8GB 内存)上使用 Zeppelin NB 和 Spark 在独立模式下进行了一些实验。spark.executor/driver.memory 都得到 2g。我也在spark.serializer org.apache.spark.serializer.KryoSerializerspark-defaults.conf 中设置了,但这似乎被 zeppelin 忽略了


ALS模型

我已经训练了一个具有约 400k(隐式)评级的 ALS 模型,并希望获得建议val allRecommendations = model.recommendProductsForUsers(1)

样本集

接下来我拿一个样本来玩

val sampledRecommendations = allRecommendations.sample(false, 0.05, 1234567).cache

这包含 3600 条建议。

移除用户拥有的产品推荐

接下来我想删除给定用户已经拥有的产品的所有评级,我在 RDD 中保存的列表 (user_id, Set[product_ids]):RDD[(Long, scala.collection.mutable.HashSet[Int])]

val productRecommendations = (sampledRecommendations
// add user portfolio to the list, but convert the key from Long to Int first
.join(usersProductsFlat.map( up => (up._1.toInt, up._2) ))
.mapValues(
    // (user, (ratings: Array[Rating], usersOwnedProducts: HashSet[Long]))
    r => (r._1
        .filter( rating => !r._2.contains(rating.product))
        .filter( rating => rating.rating > 0.5)
        .toList
    )
  )
  // In case there is no recommendation (left), remove the entry
  .filter(rating => !rating._2.isEmpty)
).cache

问题 1在缓存样本集上 调用此 ( productRecommendations.count) 会生成一个包含flatMap at MatrixFactorizationModel.scala:27810,000任务、263.6 MB 输入数据和 196.0 MB 随机写入的阶段。不应该使用微小的和缓存的RDD吗?这里发生了什么(wr)on(g)?计数的执行需要将近 5 分钟!

问题 2usersProductsFlat.count根据应用程序 UI 中的“存储”视图完全缓存的 调用每次大约需要 60 秒。它的大小为 23Mb – 不应该快很多吗?

映射为可读形式

接下来,我以某种可读的形式将 ID 替换为广播查找 Map 中的名称,以放入 DF/表中:

val readableRatings = (productRecommendations
    .flatMapValues(x=>x)
    .map( r => (r._1, userIdToMailBC.value(r._1), r._2.product.toInt, productIdToNameBC.value(r._2.product), r._2.rating))
).cache
val readableRatingsDF = readableRatings.toDF("user","email", "product_id", "product", "rating").cache
readableRatingsDF.registerTempTable("recommendations")

选择……耐心

疯狂的部分从这里开始。做一个 SELECT 需要几个小时(我永远不能等待一个完成):

%sql
SELECT COUNT(user) AS usr_cnt, product, AVG(rating) AS avg_rating
FROM recommendations
GROUP BY product

查询几乎永远


我不知道在哪里可以找到这里的瓶颈,这里显然发生了一些巨大的混乱!我可以从哪里开始寻找?

4

1 回答 1

1

您的分区数可能太大。我认为在本地模式下运行时应该使用大约 200 而不是 10000。您可以通过不同的方式设置分区数。我建议您编辑 Spark 配置文件中的 spark.default.parallelism 标志。

于 2015-11-16T17:27:49.220 回答