我在 EMR 上有一个 Spark Scala 作业,它运行顺利,直到最后一个作业,然后在最后一个作业中,我看到 2 小时没有任何进展。所有执行者都表明任务分布均匀,所以我认为这不是数据倾斜问题。这项工作正在完成,但我发现了一个覆盖地图而不是更新的错误,因为修复了这个错误,我已经看到了这种缓慢并且不知道如何修复它。这部分工作读取/写入 Redis 并使用两个 scala 可变映射,我不确定是什么导致了这种缓慢,因此我们将不胜感激。该数据集由 col("id1") 重新分区
val result: Dataset[Row] = rowsToCompute.mapPartitions(iterator => {
case class countDateMap(count: Long, dateTime: ZonedDateTime)
val id1CountDateMap = scala.collection.mutable.Map[String, scala.collection.mutable.Map[Long, Seq[countDateMap]]]()
var rowResult: ListBuffer[Row] = ListBuffer()
RedisClient.writeHost = writeHost.value
RedisClient.readHost = readHost.value
RedisClient.port = port.value
RedisClient.keyTTL = keyTTL.value
RedisClient.writePoolConfig.setMaxIdle(maxIdle.value)
RedisClient.writePoolConfig.setMaxTotal(writeMaxTotal.value)
RedisClient.writePoolConfig.setMaxWaitMillis(poolWaitTimeout.value)
RedisClient.readPoolConfig.setMaxIdle(maxIdle.value)
RedisClient.readPoolConfig.setMaxWaitMillis(poolWaitTimeout.value)
RedisClient.readPoolConfig.setMaxTotal(readMaxTotal.value)
iterator.foreach(row => {
val id1 = row.getAs("id1").asInstanceOf[String]
val availableRowsForId1 = row.getAs("rows").asInstanceOf[Seq[Row]]
var rowsMap: scala.collection.mutable.Map[Long, Seq[countDateMap]] = scala.collection.mutable.Map[Long, Seq[countDateMap]]()
if (!id1CountDateMap.isDefinedAt(id1)) {
id1CountDateMap.put(id1, rowsMap)
}
rowsMap = id1CountDateMap(id1)
availableRowsForId1.foreach(o => {
val id2 = o.getAs("id2").asInstanceOf[Long]
val currentRowCountDateTime = id1CountDateMap(id1).get(id2)
var restriction1 = false
var restriction2 = false
var localCount: Long = 1
var dateTime = o.getAs("dateTime").asInstanceOf[TimeStamp]
if (currentRowCountDateTime.isDefined) {
//business logic that checks restriction1 & restriction2 based on currentRowCountDateTime
}
if (!restriction1 && !restriction2) {
val set = RedisClient.setCount(id2.toString, (o.getAs("redisCount").asInstanceOf[String] + 1).toString)
val newCountDate= countDateMap(localCount, dateTime)
var newCountDateSeq = Seq(newCountDate)
if (currentRowCountDateTime.nonEmpty) {
newCountDateSeq = currentRowCountDateTime.get.union(Seq(newCountDate))
}
rowsMap.update(id2, newCountDateSeq)
rowResult += RowFactory.create(id1, id2.toString)
}
})
})
})
rowResult.toList.iterator
})(encoder)
//OLD THAT RAN QUICK BUT HAD A BUG B/C rowsMap GOT OVERWRITTEN NOT UPDATED
val result: Dataset[Row] = rowsToCompute.mapPartitions(iterator => {
case class countDateMap(count: Long, dateTime: ZonedDateTime)
val id1CountDateMap = new ConcurrentMap[String, Map[Long, Seq[countDateMap]]]()
var rowResult: ListBuffer[Row] = ListBuffer()
RedisClient.writeHost = writeHost.value
RedisClient.readHost = readHost.value
RedisClient.port = port.value
RedisClient.keyTTL = keyTTL.value
RedisClient.writePoolConfig.setMaxIdle(maxIdle.value)
RedisClient.writePoolConfig.setMaxTotal(writeMaxTotal.value)
RedisClient.writePoolConfig.setMaxWaitMillis(poolWaitTimeout.value)
RedisClient.readPoolConfig.setMaxIdle(maxIdle.value)
RedisClient.readPoolConfig.setMaxWaitMillis(poolWaitTimeout.value)
RedisClient.readPoolConfig.setMaxTotal(readMaxTotal.value)
iterator.foreach(row => {
val id1 = row.getAs("id1").asInstanceOf[String]
val availableRowsForId1 = row.getAs("rows").asInstanceOf[Seq[Row]]
var rowsMap= scala.collection.mutable.Map[Long, Seq[countDateMap]]()
id1CountDateMap.putifAbsent(id1, rowsMap)
availableRowsForId1.foreach(o => {
val id2 = o.getAs("id2").asInstanceOf[Long]
val currentRowCountDateTime = id1CountDateMap(id1).get(id2)
var restriction1 = false
var restriction2 = false
var localCount: Long = 1
var dateTime = o.getAs("dateTime").asInstanceOf[TimeStamp]
if (currentRowCountDateTime.isDefined) {
//business logic that checks restriction1 & restriction2 based on currentRowCountDateTime
}
if (!restriction1 && !restriction2) {
val set = RedisClient.setCount(id2.toString, (o.getAs("redisCount").asInstanceOf[String] + 1).toString)
val newCountDate = countDateMap(localCount, dateTime)
var newCountDateSeq = Seq(newCountDate)
if (currentRowCountDateTime.nonEmpty) {
newCountDate Seq = currentRowCountDateTime.get.union(Seq(newCountDate ))
}
rowsMap.update(id2, newCountDate Seq)
id1CountDateMap.replace(id1, rowsMap)
rowResult += RowFactory.create(id1, id2.toString)
}
})
})
})
rowResult.toList.iterator
})(encoder)
任务指标摘要