object NearestNeighbors {
def runNearestNeighbors(data: RDD[Array[(LabeledPoint,Int,Int)]],
kNN: Int,
sampleData: Array[(LabeledPoint,Int,Int)]): Array[(String,Array[((Int,Int),Double)])] = {
val globalNearestNeighborsByIndex = data.mapPartitionsWithIndex(localNearestNeighbors(_,_,kNN,sampleData)).groupByKey().map(x => (x._1,x._2.toArray.sortBy(r => r._2).take(kNN))).collect()
globalNearestNeighborsByIndex
}
private def localNearestNeighbors(partitionIndex: Long,
iter: Iterator[Array[(LabeledPoint,Int,Int)]],
kNN: Int,
sampleData: Array[(LabeledPoint,Int,Int)]): Iterator[(String,((Int,Int),Double))] = {
var result = List[(String,((Int,Int),Double))]()
val dataArr = iter.next
val nLocal = dataArr.size - 1
val sampleDataSize = sampleData.size - 1
val kLocalNeighbors = Array.fill[distanceIndex](sampleDataSize+1)(null)
for {
i1 <- 0 to sampleDataSize
}
kLocalNeighbors(i1) = distanceIndex(sampleData(i1)._3.toInt, sampleData(i1)._2.toInt, DenseVector.zeros[Double](kNN) + Int.MaxValue.toDouble, DenseVector.zeros[Int](kNN))
for (i <- 0 to nLocal) {
val currentPoint = dataArr(i)
val features = currentPoint._1.features
val rowId = currentPoint._3.toInt
for (j <- 0 to sampleDataSize) {
val samplePartitionId = sampleData(j)._2
val sampleRowId = sampleData(j)._3
val sampleFeatures = sampleData(j)._1.features
if (!((rowId == sampleRowId) & (samplePartitionId == partitionIndex))) {
val distance = Math.sqrt(sum((sampleFeatures - features) :* (sampleFeatures - features)))
if (distance < max(kLocalNeighbors(j).distanceVector)) {
val indexToReplace = argmax(kLocalNeighbors(j).distanceVector)
kLocalNeighbors(j).distanceVector(indexToReplace) = distance
kLocalNeighbors(j).neighborRowId(indexToReplace) = rowId
}
}
}
}
for (m <- 0 to sampleDataSize){
for (l <-0 to kNN-1) {
val key = kLocalNeighbors(m).partitionId.toString+","+kLocalNeighbors(m).sampleRowId.toString
val tup = (partitionIndex.toInt,kLocalNeighbors(m).neighborRowId(l))
result.::=(key,(tup,kLocalNeighbors(m).distanceVector(l)))
}
}
result.iterator
}
}
我正在使用https://github.com/anathan90/SparkSMOTE(scala库)来调整数据中少数类的过采样
我有序列化问题,我不知道为什么。
我读了一些关于这个错误的东西,但我不明白
另一件事是我在 scala hadoop 中运行 smote 脚本,这在另一个名为 smote 的脚本中调用了这个对象。
这是错误:
Caused by: java.io.NotSerializableException: NearestNeighbors$
Serialization stack:
- object not serializable (class: NearestNeighbors$, value: NearestNeighbors$@77542834)
- field (class: NearestNeighbors$$anonfun$1, name: $outer, type: class NearestNeighbors$)
- object (class NearestNeighbors$$anonfun$1, <function2>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)