1

I am new to spark but I am attempting to produce network clusters using user supplied tags or attributes. First I am using the jaccard minhash algorithm to produce similarity scores then running it through power iteration clustering algorithm but as soon as it starts there is no CPU activity and will run for some time with zero progress. Wondering how to configure the cluster or change the code to get this to run. Below is my code

//about 10,000 rows of (id, 100 tags in binary form)

val data = spark.read.format("csv").option("header", "true").option("delimiter", ",").option("inferSchema","true").load("gs://data/*.csv")

val columnNames = data.columns

val tags = columnNames.slice(1, columnNames.size)

//put tags in a vector
val assembler = new VectorAssembler().setInputCols(tags).setOutputCol("attributes")

val newData = assembler.transform(data).select("userID","attributes")

val mh = new MinHashLSH().setNumHashTables(5).setInputCol("attributes").setOutputCol("values")

val modelMINHASH = mh.fit(goodData)

// Approximate nearest neighbor search
val fullData = modelMINHASH.approxSimilarityJoin(newData , newData , 0.9).filter("datasetA.userID < datasetB.userID")

var explodeDF = fullData.withColumn("id", fullData("datasetA.userID")).withColumn("id2", fullData("datasetB.userID")).select("id","id2","distCol")

val temp = explodeDF.rdd

val newRDD = temp.map(x => (x.getAs[Integer]("id").longValue(),x.getAs[Integer]("id2").longValue(),1-x.getAs[Double]("distCol"))).cache()

//this is where the code haults and I see no progress
val modelPIC = new PowerIterationClustering().setK(16).setMaxIterations(5).run(newRDD)

val clusters = modelPIC.assignments
4

0 回答 0