2

在这段代码中,主角的 act 方法中的 case 永远不会匹配,所以我的 wrapUp 方法永远不会被调用。

import java.net.Socket
import scala.actors._
import Actor._
import scala.collection.mutable.ArrayBuffer

object LowPortScanner {

  var lastPort = 0
  var openPorts = ArrayBuffer[Int]()
  var longestRunTime = 00.00
  var results = List[Tuple3[Int, Range, Long]]()

  val host = "localhost"
  val numProcs = 1 to Runtime.getRuntime().availableProcessors()
  val portsPerProc = 1024 / numProcs.size
  val caller = self
  val procPortRanges = numProcs.foldLeft(List[Tuple2[Int, Range]]()) { (portRanges, proc) =>
    val tuple2 = (proc.toInt, (lastPort + 1) to (lastPort + portsPerProc))
    lastPort += portsPerProc
    tuple2 :: portRanges
  }

  def main(args: Array[String]): Unit = {

    //spawn an actor for each processor that scans a given port range
    procPortRanges.foreach { proc =>
      actor {
        caller ! scan(proc._1, proc._2)
      } //end inner actors
    } //end numProcs.foreach  

    //catch results from the processor actors above
    def act {
      loop {
        reactWithin(100) {
          //update the list of results returned from scan
          case scanResult: Tuple3[Int, Range, Long] =>
            println("Processor " + scanResult._1 + " completed scan of ports " + scanResult._2.first + " through " + scanResult._2.last)
            results = results ::: List(scanResult)

          //check if results have been returned for each actor
          case TIMEOUT =>
            println("Main actor timed out")
            if (results.size == numProcs.size) wrapUp

          case _ =>
            println("got back something weird from one of the port scan actors!")
            wrapUp
        }
      }
    }    

    //Attempt to open a socket on each port in the given range
    //returns a Tuple3[procID: Int, ports: Range, time: Long
    def scan(proc: Int, ports: Range) = {
      val startTime = System.nanoTime()
      ports.foreach { n =>
        try {
          //println("Processor " + proc + " is checking port " + n)
          print(".")

          val socket = new Socket(host, n)

          //println("Found open port: " + n)
          openPorts += n

          socket.close
        } catch {
          case e: Exception =>
          //println("While scanning port " + n + " caught Exception: " + e)
        }
      }
      (proc, ports, startTime - System.nanoTime())
    }

    //output results and kill the main actor
    def wrapUp {
      println("These are the open ports in the range 1-1024:")
      openPorts.foreach { port => println(port) }
      results.foreach { result => if (result._3 > longestRunTime) { longestRunTime = result._3 } }
      println("Time to scan ports 1 through 1024 is: %3.3f".format(longestRunTime / 1000))
      caller ! exit
    }
  }
}

为每个处理器生成参与者的代码工作正常,扫描方法为每个端口范围正确调用,我已经确认扫描了 1024 个端口。
我的期望是这条线:

caller ! scan(proc._1, proc._2)  

应该将包含 (Int, Range, Long) 的消息发送回主要参与者。然后主要参与者的 act 方法应该捕获该消息并执行:

          case scanResult: Tuple3[Int, Range, Long] =>
            println("Processor " + scanResult._1 + " completed scan of ports " + scanResult._2.first + " through " + scanResult._2.last)
            results = results ::: List(scanResult)  

然而,这并没有发生。事实上,据我所知,我的任何信息都没有返回给主角。我不清楚我错过了什么或做错了什么。

4

1 回答 1

2

您注意到的问题是没有收到正在发送的消息。这是因为创建的actor {caller ! scan(proc._1, proc._2)}参与者没有act与之关联的定义。该def act{...}方法与创建的actor无关,因为它是一个方法LowPortScanner而不是actor。

保持代码的精神,您可以定义要由参与者执行的主体, actor{...}并将其分配给一个值并向其发送消息。

 //spawn an actor for each processor that scans a given port range
 procPortRanges.foreach { proc =>
   val myactor = actor {
     //catch results from the processor actors above
       loop {
         reactWithin(100) {
         //update the list of results returned from scan
         case scanResult: Tuple3[Int, Range, Long] =>
           println("Processor " + scanResult._1 + " completed scan of ports " + scanResult._2.first + " through " + scanResult._2.last)
           results = results ::: List(scanResult)

         //check if results have been returned for each actor
         case TIMEOUT =>
           println("Main actor timed out")
           if (results.size == numProcs.size) wrapUp

         case _ =>
           println("got back something weird from one of the port scan actors!")
           wrapUp
        }
      }
     //catch results from the processor actors above
   } //end inner actors
     myactor ! scan(proc._1, proc._2)
 } //end numProcs.foreach

另一种方法是扩展 Actor 特征。这样,def act{...}将处理参与者收到的消息LowPortScanner。我还做了一些小的重构,包括使用this而不是self. Actor 也必须以this.start().

import java.net.Socket
import scala.actors._
import Actor._
import scala.collection.mutable.ArrayBuffer

object LowPortScanner extends Actor {

  var lastPort = 0
  var openPorts = ArrayBuffer[Int]()
  var longestRunTime = 00.00
  var results = List[Tuple3[Int, Range, Long]]()

  val host = "localhost"
  val numProcs = 1 to Runtime.getRuntime().availableProcessors()
  val portsPerProc = 1024 / numProcs.size

  val procPortRanges = numProcs.foldLeft(List[Tuple2[Int, Range]]()) { (portRanges, proc) =>
    val tuple2 = (proc.toInt, (lastPort + 1) to (lastPort + portsPerProc))
    lastPort += portsPerProc
    tuple2 :: portRanges
  }
    //catch results from the processor actors above
    def act {
      loop {
        reactWithin(100) {
          //update the list of results returned from scan
          case scanResult: Tuple3[Int, Range, Long] =>
            println("Processor " + scanResult._1 + " completed scan of ports " + scanResult._2.first + " through " + scanResult._2.last)
            results = results ::: List(scanResult)

          //check if results have been returned for each actor
          case TIMEOUT =>
            println("Main actor timed out")
            if (results.size == numProcs.size) wrapUp

          case _ =>
            println("got back something weird from one of the port scan actors!")
            wrapUp
        }
      }
    }

    //Attempt to open a socket on each port in the given range
    //returns a Tuple3[procID: Int, ports: Range, time: Long
    def scan(proc: Int, ports: Range) = {
      val startTime = System.nanoTime()
      ports.foreach { n =>
        try {
          //println("Processor " + proc + " is checking port " + n)
          print(".")

          val socket = new Socket(host, n)

          //println("Found open port: " + n)
          openPorts += n

          socket.close
        } catch {
          case e: Exception =>
          //println("While scanning port " + n + " caught Exception: " + e)
        }
      }
      (proc, ports, startTime - System.nanoTime())
    }

    //output results and kill the main actor
    def wrapUp {
      println("These are the open ports in the range 1-1024:")
      openPorts.foreach { port => println(port) }
      results.foreach { result => if (result._3 > longestRunTime) { longestRunTime = result._3 } }
      println("Time to scan ports 1 through 1024 is: %3.3f".format(longestRunTime / 1000))
      this ! exit
    }

  def main(args: Array[String]): Unit = {

    //spawn an actor for each processor that scans a given port range
    this.start()
    procPortRanges.foreach { proc =>
      actor {
        this ! scan(proc._1, proc._2)
      } //end inner actors
    } //end numProcs.foreach  
  }

}

以下是运行的结果:

scala> LowPortScanner.main(Array[String]())

scala> ...Processor 6 completed scan of ports 641 through 768
...Processor 5 completed scan of ports 513 through 640
...Processor 4 completed scan of ports 385 through 512
...Processor 3 completed scan of ports 257 through 384
...Processor 1 completed scan of ports 1 through 128
...Processor 7 completed scan of ports 769 through 896
...Processor 2 completed scan of ports 129 through 256
...Processor 8 completed scan of ports 897 through 1024
Main actor timed out
These are the open ports in the range 1-1024:
139
22
445
591
631
111
Time to scan ports 1 through 1024 is: 0.000


scala> LowPortScanner.results
res2: List[(Int, Range, Long)] = List((6,Range(641, 642, 643,...
于 2012-11-09T23:06:40.333 回答