作为课程项目的一部分,我正在尝试使用选择性重复在 java 中的基本 UDP 协议上实现一层可靠性:http ://en.wikipedia.org/wiki/Selective_Repeat_ARQ 。基本上,每个数据包 - 在发送时 - 在单独的线程中跟踪自己的计时器。如果任何特定计时器用完,则重新发送数据包。
对于相对较大的超时设置(例如 500 毫秒),此代码执行良好,并且大文件完全发送到接收器。但是,如果设置了较低的超时时间(例如 20 毫秒),我会收到以下向终端发送垃圾邮件的错误:
java.nio.channels.ClosedChannelException
at sun.nio.ch.DatagramChannelImpl.ensureOpen(DatagramChannelImpl.java:132)
at sun.nio.ch.DatagramChannelImpl.send(DatagramChannelImpl.java:241)
at Sender4.sendPak(Sender4.java:118)
at Sender4.access$000(Sender4.java:8)
at Sender4$packetTimer.run(Sender4.java:135)
然而,据我所见,该频道并未关闭。此异常的文档指出:
当尝试在已关闭或至少对该操作关闭的通道上调用或完成 I/O 操作时抛出已检查异常。抛出此异常并不一定意味着通道已完全关闭。例如,写入一半已关闭的套接字通道可能仍处于打开状态以供读取。
这让我认为它可能已关闭,因为它在某些方面不可用。由于它只发生在较小的超时值,也许这是因为两个线程试图同时重新发送?但是,发送(sendPak)的方法是同步的......所以这应该是不可能的。
是什么导致了这个问题?或者我可以使用什么修复程序来避免遇到这个问题?这是我程序的发送方部分的代码,我很确定接收方没问题:
/* Craig Innes 0929508 */
import java.io.*;
import java.net.*;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.nio.channels.*;
public class Sender4 {
short base = 0;
short nextSeqNum = 0;
byte[][] packets;
ByteBuffer bb;
String endSys;
int portNum;
String fileName;
int retryTime;
int windowSize;
DatagramSocket clientSocket;
InetAddress IPAddress;
InetSocketAddress destination;
boolean timedOut = false;
int resends = 0;
HashMap<Short, packetTimer> timers = new HashMap<Short, packetTimer>();
DatagramChannel clientChannel;
public Sender4(String endSys, int portNum, String fileName, int retryTime, int windowSize){
this.endSys = endSys;
this.portNum = portNum;
this.fileName = fileName;
this.retryTime = retryTime;
this.windowSize = windowSize;
}
public static void main(String args[]) throws Exception{
//Check for current arguments and assign them
if(args.length != 5){
System.out.println("Invalid number of arguments. Please specify: <endSystem> <portNumber> <fileName> <retryTimeout><windowSize>");
System.exit(1);
}
Sender4 sendy = new Sender4(args[0], Integer.parseInt(args[1]), args[2], Integer.parseInt(args[3]), Integer.parseInt(args[4]));
sendy.go();
}
private void go() throws Exception{
clientChannel = DatagramChannel.open();
clientChannel.configureBlocking(false);
bb = ByteBuffer.allocate(2);
byte[] picData = new byte[1021];
byte[] sendData = new byte[1024];
byte[] seqBytes = new byte[2];
byte EOFFlag = 0;
boolean acknowledged = false;
int resends = 0;
IPAddress = InetAddress.getByName(endSys);
destination = new InetSocketAddress(IPAddress, portNum);
FileInputStream imReader = new FileInputStream(new File(fileName));
double fileSizeKb = imReader.available() / 1021.0; //We add 3 bytes to every packet, so dividing by 1021 will give us total kb sent.
int packetsNeeded = (int) Math.ceil(fileSizeKb);
packets = new byte[packetsNeeded][];
long startTime = System.currentTimeMillis();
long endTime;
double throughput;
//Create array of packets to send
for(int i = 0; i < packets.length; i++){
if(i == packets.length - 1){
EOFFlag = 1;
picData = new byte[imReader.available()];
sendData = new byte[picData.length + 3];
}
imReader.read(picData);
bb.putShort((short)i);
bb.flip();
seqBytes = bb.array();
bb.clear();
System.arraycopy(seqBytes, 0, sendData, 0, seqBytes.length);
sendData[2] = EOFFlag;
System.arraycopy(picData, 0, sendData, 3, picData.length);
packets[i] = (byte[])sendData.clone();
}
//System.out.println("timeout is: " + timedOut + " base is: " + base + " packet length is: " + packets.length + " nextSeqNum: " + nextSeqNum);
while(base != packets.length || !timers.isEmpty()){
while(nextSeqNum - base < windowSize && nextSeqNum < packets.length){
System.out.println("sending packet with seqNum: " + nextSeqNum);
sendPak(nextSeqNum);
timers.put(nextSeqNum, new packetTimer(nextSeqNum));
timers.get(nextSeqNum).start();
System.out.println("nextSeq: " + nextSeqNum + "base " + base + "windowSize " + windowSize + "timer size" + timers.size());
nextSeqNum++;
}
//Done all the sending we can, have a check for any ACKs we have received...
getACK();
}
endTime = System.currentTimeMillis();
throughput = 1000 * fileSizeKb / (endTime - startTime);
clientChannel.close();
imReader.close();
System.out.println("Number of retransmissions: " + resends);
System.out.println("Average throughput is: " + throughput + "Kb/s");
}
private synchronized void sendPak(short resNum) throws IOException{
//System.out.println("Timed out waiting for acknowledgement, resending all unACKed packets in window");
ByteBuffer sendBuff = ByteBuffer.wrap(packets[resNum]);
clientChannel.send(sendBuff, destination);
sendBuff.clear();
}
private class packetTimer extends Thread{
short sendingNum;
boolean timeToStop = false;
boolean fileACKed = false;
public packetTimer(short seqNum){
sendingNum = seqNum;
}
public void run() {
//If packet times out - resend. If thread interrupted, we have received the corresponding ack
while(waitForACK()){
System.out.println("Packet timed out. Resending packet: " + sendingNum);
try{
sendPak(sendingNum);
}catch(IOException ex){
System.out.println("I think this is causing the problems");
ex.printStackTrace();
}
}
System.out.println("Thread" + sendingNum + "has reached completion");
}
private boolean waitForACK(){
if(this.interrupted()){
return false;
}
try{
Thread.sleep(retryTime);
}catch(InterruptedException ex){
return false;
}
return true;
}
}
private synchronized void getACK() throws Exception{
//Listen out for ACKs and update pointers accordingly
ByteBuffer ackBuff;
byte[] ackData = new byte[2];
ackBuff = ByteBuffer.wrap(ackData);
SocketAddress recked = clientChannel.receive(ackBuff);
if(recked != null){ //Only if it actually receives anything, check for nullity
//System.out.println("ACK buff size: " + ackBuff.capacity() + "Current position: " + ackBuff.position() + "remaining: " + ackBuff.remaining());
ackBuff.flip();
short ack = ackBuff.getShort();
System.out.println("ack received: " + ack);
ackBuff.clear();
if(timers.containsKey(ack)){ //Stop Timer
System.out.println("Interrupting timer: " + ack);
timers.get(ack).interrupt();
timers.get(ack).fileACKed = true;
}
if(base == ack){ //If you receive ack for the base, remove all the consecutively stopped timers
while(timers.containsKey(base) && timers.get(base).fileACKed){
System.out.println("Removing: " + base);
timers.remove(base);
base++;
}
}
//System.out.println("acknowledgement for base num: " + base + "ack num:" + ack);
}
System.out.println("Waiting for base: " + base + "packets length is " + packets.length + "timers size is: " + timers.size() + "but is it empty? " + timers.isEmpty());
Thread.yield();
}
}