计算机网络软件开发课程的家庭作业,教授让我们为端口 1-1024 构建一个端口扫描器,以便在本地主机上运行。练习的重点是使用参与者演示任务级别的并行性。教授提供了按顺序扫描每个端口的代码。我们将创建一个并行执行此操作的版本,其中每个处理器或系统可用的超线程都有一个参与者。目标是获得时间来完成对所有端口 1-1024 的完整扫描,并将并行扫描的结果与串行扫描的结果进行比较。这是我的并行扫描代码:
import java.net.Socket
import scala.actors._
import Actor._
import scala.collection.mutable.ArrayBuffer
object LowPortScanner {
var lastPort = 0
var openPorts = ArrayBuffer[Int]()
var longestRunTime = 00.00
var results = List[Tuple3[Int, Range, Double]]()
val host = "localhost"
val numProcs = 1 to Runtime.getRuntime().availableProcessors()
val portsPerProc = 1024 / numProcs.size
val caller = self
def main(args: Array[String]): Unit = {
//spawn an actor for each processor that scans a given port range
numProcs.foreach { proc =>
actor {
val portRange: Range = (lastPort + 1) to (lastPort + portsPerProc)
lastPort = lastPort + portsPerProc
caller ! scan(proc, portRange)
}
}
//catch results from the processor actors above
def act {
loop {
reactWithin(100) {
//update the list of results returned from scan
case scanResult: Tuple3[Int, Range, Double] =>
results = results ::: List(scanResult)
//check if all results have been returned for each actor
case TIMEOUT =>
if (results.size == numProcs.size) wrapUp
case _ =>
println("got back something weird from one of the port scan actors!")
wrapUp
}
}
}
//Attempt to open a socket on each port in the given range
//returns a Tuple3[procID: Int, ports: Range, time: Double
def scan(proc: Int, ports: Range) {
val startTime = System.nanoTime()
ports.foreach { n =>
try {
println("Processor " + proc + "is checking port " + n)
val socket = new Socket(host, n)
//println("Found open port: " + n)
openPorts += n
socket.close
} catch {
case e: Exception =>
//println("While scanning port " + n + " caught Exception: " + e)
}
}
(proc, ports, startTime - System.nanoTime())
}
//output results and kill the main actor
def wrapUp {
println("These are the open ports in the range 1-1024:")
openPorts.foreach { port => println(port) }
results.foreach { result => if (result._3 > longestRunTime) { longestRunTime = result._3} }
println("Time to scan ports 1 through 1024 is: %3.3f".format(longestRunTime / 1000))
caller ! exit
}
}
}
我有一个四核 i7,所以我的 numProcs = 8。在这个硬件平台上,每个 proc actor 应该扫描 128 个端口(1024/8 = 128)。我的意图是让 proc1 演员扫描 0 - 128,proc2 应该扫描 129-256,等等......但是,这不是正在发生的事情。一些演员最终与其他演员在同一范围内工作。下面的输出示例说明了这个问题:
处理器 2 正在检查端口 1
处理器 7 正在检查端口 385
处理器 1 正在检查端口 1
处理器 5 正在检查端口 1
处理器 4 正在检查端口 1
处理器 8 正在检查端口 129
处理器 3 正在检查端口 1
处理器 6 正在检查端口 257
处理器 1正在检查端口 2
处理器 5 正在检查端口 2
处理器 1 正在检查端口 3
处理器 3 正在检查端口 2
处理器 5 正在检查端口 3
处理器 1 正在检查端口 4
编辑
最终“工作”代码:
import java.net.Socket
import scala.actors._
import Actor._
import scala.collection.mutable.ArrayBuffer
object LowPortScanner {
var lastPort = 0
var openPorts = ArrayBuffer[Int]()
var longestRunTime = 00.00
var results = List[Tuple3[Int, Range, Double]]()
val host = "localhost"
val numProcs = 1 to Runtime.getRuntime().availableProcessors()
val portsPerProc = 1024 / numProcs.size
val caller = self
val procPortRanges = numProcs.foldLeft(List[Tuple2[Int, Range]]()) { (portRanges, proc) =>
val tuple2 = (proc.toInt, (lastPort + 1) to (lastPort + portsPerProc))
lastPort += portsPerProc
tuple2 :: portRanges
}
def main(args: Array[String]): Unit = {
//spawn an actor for each processor that scans a given port range
procPortRanges.foreach { proc =>
actor {
caller ! scan(proc._1, proc._2)
}
}
//catch results from the processor actors above
def act {
loop {
reactWithin(100) {
//update the list of results returned from scan
case scanResult: Tuple3[Int, Range, Double] =>
results = results ::: List(scanResult)
//check if results have been returned for each actor
case TIMEOUT =>
if (results.size == numProcs.size) wrapUp
case _ =>
println("got back something weird from one of the port scan actors!")
wrapUp
}
}
}
//Attempt to open a socket on each port in the given range
//returns a Tuple3[procID: Int, ports: Range, time: Double
def scan(proc: Int, ports: Range) {
val startTime = System.nanoTime()
ports.foreach { n =>
try {
println("Processor " + proc + "is checking port " + n)
val socket = new Socket(host, n)
//println("Found open port: " + n)
openPorts += n
socket.close
} catch {
case e: Exception =>
//println("While scanning port " + n + " caught Exception: " + e)
}
}
(proc, ports, startTime - System.nanoTime())
}
//output results and kill the main actor
def wrapUp {
println("These are the open ports in the range 1-1024:")
openPorts.foreach { port => println(port) }
results.foreach { result => if (result._3 > longestRunTime) { longestRunTime = result._3} }
println("Time to scan ports 1 through 1024 is: %3.3f".format(longestRunTime / 1000))
caller ! exit
}
}
}