2

作为课程项目的一部分,我正在尝试使用选择性重复在 java 中的基本 UDP 协议上实现一层可靠性:http ://en.wikipedia.org/wiki/Selective_Repeat_ARQ 。基本上,每个数据包 - 在发送时 - 在单独的线程中跟踪自己的计时器。如果任何特定计时器用完,则重新发送数据包。

对于相对较大的超时设置(例如 500 毫秒),此代码执行良好,并且大文件完全发送到接收器。但是,如果设置了较低的超时时间(例如 20 毫秒),我会收到以下向终端发送垃圾邮件的错误:

java.nio.channels.ClosedChannelException
at sun.nio.ch.DatagramChannelImpl.ensureOpen(DatagramChannelImpl.java:132)
at sun.nio.ch.DatagramChannelImpl.send(DatagramChannelImpl.java:241)
at Sender4.sendPak(Sender4.java:118)
at Sender4.access$000(Sender4.java:8)
at Sender4$packetTimer.run(Sender4.java:135)

然而,据我所见,该频道并未关闭。此异常的文档指出:

当尝试在已关闭或至少对该操作关闭的通道上调用或完成 I/O 操作时抛出已检查异常。抛出此异常并不一定意味着通道已完全关闭。例如,写入一半已关闭的套接字通道可能仍处于打开状态以供读取。

这让我认为它可能已关闭,因为它在某些方面不可用。由于它只发生在较小的超时值,也许这是因为两个线程试图同时重新发送?但是,发送(sendPak)的方法是同步的......所以这应该是不可能的。

是什么导致了这个问题?或者我可以使用什么修复程序来避免遇到这个问题?这是我程序的发送方部分的代码,我很确定接收方没问题:

/* Craig Innes 0929508 */
import java.io.*;
import java.net.*;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.nio.channels.*;

public class Sender4 {
    short base = 0;
    short nextSeqNum = 0;
    byte[][] packets;
    ByteBuffer bb;
    String endSys;
    int portNum;
    String fileName;
    int retryTime;
    int windowSize;
    DatagramSocket clientSocket;
    InetAddress IPAddress;
    InetSocketAddress destination;
    boolean timedOut = false;
    int resends = 0;
    HashMap<Short, packetTimer> timers = new HashMap<Short, packetTimer>();
    DatagramChannel clientChannel;

    public Sender4(String endSys, int portNum, String fileName, int retryTime, int windowSize){
        this.endSys = endSys;
        this.portNum = portNum;
        this.fileName = fileName;
        this.retryTime = retryTime;
        this.windowSize = windowSize;
    }

    public static void main(String args[]) throws Exception{
        //Check for current arguments and assign them
        if(args.length != 5){
            System.out.println("Invalid number of arguments. Please specify: <endSystem> <portNumber> <fileName> <retryTimeout><windowSize>");
            System.exit(1);
        }

        Sender4 sendy = new Sender4(args[0], Integer.parseInt(args[1]), args[2], Integer.parseInt(args[3]), Integer.parseInt(args[4]));

        sendy.go();
    }

    private void go() throws Exception{

        clientChannel = DatagramChannel.open();
        clientChannel.configureBlocking(false);


        bb = ByteBuffer.allocate(2);
        byte[] picData = new byte[1021];
        byte[] sendData = new byte[1024];
        byte[] seqBytes = new byte[2];
        byte EOFFlag = 0;
        boolean acknowledged = false;
        int resends = 0;
        IPAddress = InetAddress.getByName(endSys);
        destination = new InetSocketAddress(IPAddress, portNum);

        FileInputStream imReader = new FileInputStream(new File(fileName));
        double fileSizeKb = imReader.available() / 1021.0; //We add 3 bytes to every packet, so dividing by 1021 will give us total kb sent. 
        int packetsNeeded = (int) Math.ceil(fileSizeKb);
        packets = new byte[packetsNeeded][];
        long startTime = System.currentTimeMillis();
        long endTime;
        double throughput;

        //Create array of packets to send
        for(int i = 0; i < packets.length; i++){
            if(i == packets.length - 1){
                EOFFlag = 1;
                picData = new byte[imReader.available()];
                sendData = new byte[picData.length + 3];
            }
            imReader.read(picData);
            bb.putShort((short)i);
            bb.flip();
            seqBytes = bb.array();
            bb.clear();
            System.arraycopy(seqBytes, 0, sendData, 0, seqBytes.length);
            sendData[2] = EOFFlag;
            System.arraycopy(picData, 0, sendData, 3, picData.length);
            packets[i] = (byte[])sendData.clone();
        }

        //System.out.println("timeout is: " + timedOut + " base is: " + base + " packet length is: " + packets.length + " nextSeqNum: " + nextSeqNum);

        while(base != packets.length || !timers.isEmpty()){

            while(nextSeqNum - base < windowSize && nextSeqNum < packets.length){
                System.out.println("sending packet with seqNum: " + nextSeqNum);
                sendPak(nextSeqNum);
                timers.put(nextSeqNum, new packetTimer(nextSeqNum));
                timers.get(nextSeqNum).start();
                System.out.println("nextSeq: " + nextSeqNum + "base " + base + "windowSize " + windowSize + "timer size" + timers.size());
                nextSeqNum++;
            }           

            //Done all the sending we can, have a check for any ACKs we have received...
            getACK();

        }

        endTime = System.currentTimeMillis();
        throughput = 1000 * fileSizeKb / (endTime - startTime);
        clientChannel.close();
        imReader.close();
        System.out.println("Number of retransmissions: " + resends);
        System.out.println("Average throughput is: " + throughput + "Kb/s");

    }

    private synchronized void sendPak(short resNum) throws IOException{
        //System.out.println("Timed out waiting for acknowledgement, resending all unACKed packets in window");
            ByteBuffer sendBuff = ByteBuffer.wrap(packets[resNum]);
            clientChannel.send(sendBuff, destination);
            sendBuff.clear();
    }

    private class packetTimer extends Thread{
        short sendingNum;
        boolean timeToStop = false;
        boolean fileACKed = false;
        public packetTimer(short seqNum){
            sendingNum = seqNum;
        }
        public void run() {
            //If packet times out - resend. If thread interrupted, we have received the corresponding ack
                while(waitForACK()){
                    System.out.println("Packet timed out. Resending packet: " + sendingNum);

                    try{
                        sendPak(sendingNum);
                    }catch(IOException ex){
                        System.out.println("I think this is causing the problems");
                        ex.printStackTrace();
                    }
                }

            System.out.println("Thread" + sendingNum + "has reached completion");
        }

        private boolean waitForACK(){
            if(this.interrupted()){
                return false;
            }
            try{
                Thread.sleep(retryTime);
            }catch(InterruptedException ex){
                return false;
            }
            return true;
        }
    }


    private synchronized void getACK() throws Exception{
        //Listen out for ACKs and update pointers accordingly
        ByteBuffer ackBuff;
        byte[] ackData = new byte[2];
        ackBuff = ByteBuffer.wrap(ackData);
        SocketAddress recked = clientChannel.receive(ackBuff);
        if(recked != null){ //Only if it actually receives anything, check for nullity
            //System.out.println("ACK buff size: " + ackBuff.capacity() + "Current position: " + ackBuff.position() + "remaining: " + ackBuff.remaining());
            ackBuff.flip();
            short ack = ackBuff.getShort();
            System.out.println("ack received: " + ack);
            ackBuff.clear();

            if(timers.containsKey(ack)){ //Stop Timer
                System.out.println("Interrupting timer: " + ack);
                timers.get(ack).interrupt();
                timers.get(ack).fileACKed = true;
            }

            if(base == ack){ //If you receive ack for the base, remove all the consecutively stopped timers
                while(timers.containsKey(base) && timers.get(base).fileACKed){
                    System.out.println("Removing: " + base);
                    timers.remove(base);
                    base++;
                }
            }
            //System.out.println("acknowledgement for base num: " + base + "ack num:" + ack);
        }

        System.out.println("Waiting for base: " + base + "packets length is " + packets.length + "timers size is: " + timers.size() + "but is it empty? " + timers.isEmpty());
        Thread.yield();
    }
}
4

0 回答 0