1

我在 EMR 上有一个 Spark Scala 作业,它运行顺利,直到最后一个作业,然后在最后一个作业中,我看到 2 小时没有任何进展。所有执行者都表明任务分布均匀,所以我认为这不是数据倾斜问题。这项工作正在完成,但我发现了一个覆盖地图而不是更新的错误,因为修复了这个错误,我已经看到了这种缓慢并且不知道如何修复它。这部分工作读取/写入 Redis 并使用两个 scala 可变映射,我不确定是什么导致了这种缓慢,因此我们将不胜感激。该数据集由 col("id1") 重新分区

val result: Dataset[Row] = rowsToCompute.mapPartitions(iterator => {
  case class countDateMap(count: Long, dateTime: ZonedDateTime)
  val id1CountDateMap = scala.collection.mutable.Map[String, scala.collection.mutable.Map[Long, Seq[countDateMap]]]()

  var rowResult: ListBuffer[Row] = ListBuffer()
  RedisClient.writeHost = writeHost.value
  RedisClient.readHost = readHost.value
  RedisClient.port = port.value
  RedisClient.keyTTL = keyTTL.value
  RedisClient.writePoolConfig.setMaxIdle(maxIdle.value)
  RedisClient.writePoolConfig.setMaxTotal(writeMaxTotal.value)
  RedisClient.writePoolConfig.setMaxWaitMillis(poolWaitTimeout.value)
  RedisClient.readPoolConfig.setMaxIdle(maxIdle.value)
  RedisClient.readPoolConfig.setMaxWaitMillis(poolWaitTimeout.value)
  RedisClient.readPoolConfig.setMaxTotal(readMaxTotal.value)

  iterator.foreach(row => {
    val id1 = row.getAs("id1").asInstanceOf[String]
    val availableRowsForId1 = row.getAs("rows").asInstanceOf[Seq[Row]]
    var rowsMap: scala.collection.mutable.Map[Long, Seq[countDateMap]] = scala.collection.mutable.Map[Long, Seq[countDateMap]]()
    if (!id1CountDateMap.isDefinedAt(id1)) {
      id1CountDateMap.put(id1, rowsMap)
    }
    rowsMap = id1CountDateMap(id1)
    
    availableRowsForId1.foreach(o => {
        val id2 = o.getAs("id2").asInstanceOf[Long]
        val currentRowCountDateTime = id1CountDateMap(id1).get(id2)
        var restriction1 = false
        var restriction2 = false
        var localCount: Long = 1
        var dateTime = o.getAs("dateTime").asInstanceOf[TimeStamp]
        if (currentRowCountDateTime.isDefined) {
          //business logic that checks restriction1 & restriction2 based on currentRowCountDateTime
        } 

        if (!restriction1 && !restriction2) {
          val set = RedisClient.setCount(id2.toString, (o.getAs("redisCount").asInstanceOf[String] + 1).toString)
          val newCountDate= countDateMap(localCount, dateTime)
          var newCountDateSeq = Seq(newCountDate)
          if (currentRowCountDateTime.nonEmpty) {
            newCountDateSeq = currentRowCountDateTime.get.union(Seq(newCountDate))
          }
          rowsMap.update(id2, newCountDateSeq)
          rowResult += RowFactory.create(id1, id2.toString)
        }
      })
    })
  })
  rowResult.toList.iterator
})(encoder)
//OLD THAT RAN QUICK BUT HAD A BUG B/C rowsMap GOT OVERWRITTEN NOT UPDATED
val result: Dataset[Row] = rowsToCompute.mapPartitions(iterator => {
  case class countDateMap(count: Long, dateTime: ZonedDateTime)
  val id1CountDateMap = new ConcurrentMap[String, Map[Long, Seq[countDateMap]]]()

  var rowResult: ListBuffer[Row] = ListBuffer()
  RedisClient.writeHost = writeHost.value
  RedisClient.readHost = readHost.value
  RedisClient.port = port.value
  RedisClient.keyTTL = keyTTL.value
  RedisClient.writePoolConfig.setMaxIdle(maxIdle.value)
  RedisClient.writePoolConfig.setMaxTotal(writeMaxTotal.value)
  RedisClient.writePoolConfig.setMaxWaitMillis(poolWaitTimeout.value)
  RedisClient.readPoolConfig.setMaxIdle(maxIdle.value)
  RedisClient.readPoolConfig.setMaxWaitMillis(poolWaitTimeout.value)
  RedisClient.readPoolConfig.setMaxTotal(readMaxTotal.value)

  iterator.foreach(row => {
    val id1 = row.getAs("id1").asInstanceOf[String]
    val availableRowsForId1 = row.getAs("rows").asInstanceOf[Seq[Row]]
    var rowsMap= scala.collection.mutable.Map[Long, Seq[countDateMap]]()
    id1CountDateMap.putifAbsent(id1, rowsMap)
    
    availableRowsForId1.foreach(o => {
        val id2 = o.getAs("id2").asInstanceOf[Long]
        val currentRowCountDateTime = id1CountDateMap(id1).get(id2)
        var restriction1 = false
        var restriction2 = false
        var localCount: Long = 1
        var dateTime = o.getAs("dateTime").asInstanceOf[TimeStamp]
        if (currentRowCountDateTime.isDefined) {
          //business logic that checks restriction1 & restriction2 based on currentRowCountDateTime
        } 

        if (!restriction1 && !restriction2) {
          val set = RedisClient.setCount(id2.toString, (o.getAs("redisCount").asInstanceOf[String] + 1).toString)
          val newCountDate = countDateMap(localCount, dateTime)
          var newCountDateSeq = Seq(newCountDate)
          if (currentRowCountDateTime.nonEmpty) {
            newCountDate Seq = currentRowCountDateTime.get.union(Seq(newCountDate ))
          }
          rowsMap.update(id2, newCountDate Seq)
          id1CountDateMap.replace(id1, rowsMap)
          rowResult += RowFactory.create(id1, id2.toString)
        }
      })
    })
  })
  rowResult.toList.iterator
})(encoder)

任务指标摘要

在此处输入图像描述

4

0 回答 0