4

我在 Spark 中使用 Pregel 编写了处理图形的代码,但是对于一个小数据集,它的执行速度非常非常慢。我以前用 pregel 写过程序,但是这段代码运行起来很慢。我的集群由 2 个工作人员组成。每个都有核心 i5 CPU 和 6 GB 的 RAM。这是我在 Pregel 中编写的代码:

def run[VD, ED: ClassTag](graph: Graph[VD, ED], maxSteps: Int) = {

      val temp_graph = graph.mapVertices { case (vid, _) => mutable.HashMap[VertexId, (Double,VertexId)](vid -> (1,vid)) }

      def sendMessage(e: EdgeTriplet[mutable.HashMap[VertexId, (Double,VertexId)], ED]): Iterator[(VertexId, mutable.HashMap[VertexId, List[(Double, VertexId)]])] = {
        val msg1 = e.dstAttr.map{ case (k,v) => (k, List(v)) }
        val msg2 = e.srcAttr.map{ case (k,v) => (k, List(v)) }
        Iterator((e.srcId, msg1), (e.dstId, msg2))
      }

      def mergeMessage(count1: (mutable.HashMap[VertexId, List[(Double,VertexId)]]), count2: (mutable.HashMap[VertexId, List[(Double,VertexId)]]))= {

        val communityMap = new mutable.HashMap[VertexId, List[(Double, VertexId)]]

        (count1.keySet ++ count2.keySet).map(key => {

          val count1Val: List[(Double, VertexId)] = count1.getOrElse(key,Nil)
          val count2Val: List[(Double, VertexId)] = count2.getOrElse(key,Nil)

          val pp = List(count1Val:::count2Val).flatten
          communityMap += key-> pp
        })
        communityMap
      }

      def vertexProgram(vid: VertexId, attr: mutable.HashMap[VertexId,(Double, VertexId)], message: mutable.HashMap[VertexId, List[(Double, VertexId)]]) = {

        if (message.isEmpty) attr
        else {

          val labels_score: mutable.HashMap[VertexId, Double] = message.map {

            key =>

              var value_sum = 0D
              var maxSimilar_result = 0D

              val max_similar = most_similar.filter(x => x._1 == vid).headOption match {
                case Some(x) => x._2 // most similar neighbor
                // case _ => -1
              }


              if (key._2.exists(x=>x._2==max_similar)) {
                maxSimilar_result = key._2.filter(v => v._2 == max_similar).headOption match {
                  case Some(v) => v._1 // is the most similar node in the List?
                  // case _ => 0D
                }
              }
              else maxSimilar_result = 0D

              key._2.map {
                values =>

                  value_sum += values._1 * (broadcastVariable.value(vid)(values._2)._2)

              }
              value_sum += (beta*value_sum)+((1-beta)*maxSimilar_result)

              (key._1,value_sum) //label list
          }


          val max_value = labels_score.maxBy(x=>x._2)._2.toDouble
          val dividedByMax: mutable.Map[VertexId, (Double, Double)] = labels_score.map(x=>(x._1,(x._2,x._2/max_value))) // divide by maximum value

          val resultMap: mutable.HashMap[VertexId, (Double, Double)] = new mutable.HashMap[VertexId,(Double, Double)]

          dividedByMax.map{ row => // select labels more than threshold P = 0.75
            if (row._2._1 >= p) resultMap += row
          }


          val xx = if (resultMap.isEmpty) dividedByMax.take(1).asInstanceOf[mutable.HashMap[VertexId, (Double, Double)]]
          else resultMap


          val rr = xx.map(x=>(x._1,x._2._1))
          val max_for_normalize= rr.values.sum

          val res: mutable.HashMap[VertexId, (Double, VertexId)] = rr.map(x=>(x._1->(x._2/max_for_normalize,vid))) // Normalize labels

          res
        }
      }


      val initialMessage = mutable.HashMap[VertexId, List[(Double,VertexId)]]()

      val overlapCommunitiesGraph = Pregel(temp_graph, initialMessage, maxIterations = maxSteps)(
        vprog = vertexProgram,
        sendMsg = sendMessage,
        mergeMsg = mergeMessage)

      overlapCommunitiesGraph
    }

谁能解释这个执行缓慢的问题在哪里?是不是因为我有 2 个 worker 并且 Pregel 中有很多消息传递和归约操作,所以我的代码在大数据集中的性能下降了很多?

4

0 回答 0