0

我正在编写一个程序来测试 Java 的网络 API(旧 io vs nio vs nio2)。

我有一个只发送两个值的服务器:

  1. System.nanoTime()
  2. 计数器计算发送的消息数。

客户端接收此数据,将远程 System.nanoTime() 与本地时间戳进行比较以计算延迟并检查计数器以确保没有数据丢失。

由于这只是一个测试,服务器和客户端运行在同一个 JVM 中。90% 的时间数据被正确传输;但是,每隔一段时间,时间戳就会完全错误。看起来它可能是一个上溢/下溢错误,但我看不出它是如何引入的。以下是错误示例:

错误:计数器 3,remoteTS -8267580102784516096,localTS 155321716184402,差异 8267735424500700498

请注意,本地时间戳 155321716184402 转换为晚上 7 点之后。但是远程时间戳只是负数!如果您查看代码,我没有做任何花哨的日期数学,它不可能是负面的。我也看不出我怎么会得到一个溢出错误。我认为这可能是由于大端与小端,但是每个值都是错误的,而不仅仅是其中一些。

代码(从稍大的测试中提取)如下:

package networkioshootout;

import static java.lang.System.out;

import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.Date;
import java.util.Random;
import java.util.concurrent.ExecutionException;


public class DebugNetwork {
  private final static int SENDCOUNT = 100;
    private final static int PORT = 9000;
    private final static int TESTLOOP = 10;
    private final static Random rn = new Random();

    public static void main(String[] args) throws IOException, InterruptedException, ExecutionException {
        long currentNanos = System.nanoTime();
        long currentMillis = System.currentTimeMillis();
        Date now = new Date();
        System.out.println(String.format("Current date/time:%s, nanos:%s, millis:%s",
                now, currentNanos, currentMillis));

        //Server
        new Server().start();

        //Client
        for(int i=0; i< TESTLOOP; i++){
            final int DATASIZE = (1+rn.nextInt(99))*8;
            clientInputstream(DATASIZE);
        }
    }

    private static void clientInputstream(int bufferSize) throws IOException, UnknownHostException {
        final byte[] internalBuffer = new byte[bufferSize+16] ;
        final ByteBuffer longExtractor = ByteBuffer.allocate(16);

        int bytesReadSoFar = 0;
        long counter = 0;

        Socket client = new Socket(InetAddress.getLocalHost(), PORT);
        InputStream in = client.getInputStream();

        byte[] data = new byte[bufferSize];
        int size = 0;

        try{
            while(-1 != (size = in.read(data))){
                for(int i=0; i < size; i++){
                    internalBuffer[i+bytesReadSoFar] = data[i];
                }
                bytesReadSoFar += size;

                if(bytesReadSoFar >= 16){
                    int values = bytesReadSoFar/16;
                    int toRead = values;
                    int remainder = bytesReadSoFar % 16;

                    for(int i=0; i< toRead; i++){
                        int j = i * 16;

                        //long remoteTS = ByteBuffer.wrap(new byte[]{internalBuffer[j+0],internalBuffer[j+1],internalBuffer[j+2],internalBuffer[j+3],internalBuffer[j+4],internalBuffer[j+5],internalBuffer[j+6],internalBuffer[j+7]}).getLong();
                        //long remoteCounter = ByteBuffer.wrap(new byte[]{internalBuffer[j+8],internalBuffer[j+9],internalBuffer[j+10],internalBuffer[j+11],internalBuffer[j+12],internalBuffer[j+13],internalBuffer[j+14],internalBuffer[j+15]}).getLong();

                        //long remoteTS = data[0] | ((int)(data[1]) << 4) | ((int)(data[1]) << 8) | ((int)(data[1]) << 12) | ((int)(data[1]) << 16) | ((int)(data[1]) << 20) | ((int)(data[1]) << 24) ;

                        longExtractor.put(internalBuffer, j, 16);
                        longExtractor.flip();
                        long remoteTS = longExtractor.getLong();
                        long remoteCounter = longExtractor.getLong();
                        longExtractor.clear();

                        if(remoteCounter != counter){
                            String error = "ERROR: Expected remote counter to be "+counter+" but it was actually "+remoteCounter;
                            //System.out.println(error);
                            throw new RuntimeException(error);
                        }
                        counter++;

                        long localTS = System.nanoTime();
                        long latency = localTS - remoteTS;
                        if(Math.abs(latency) > 1200000000) {
                            out.println(String.format("ERROR: counter %s, remoteTS %s, localTS %s, diff %s",
                                    counter, remoteTS, localTS, latency));
                            continue;
                        }


                    }

                    //System.arraycopy(data, toRead, data, 0, remainder);
                    for(int i=0; i < remainder; i++){
                        internalBuffer[i] = internalBuffer[i+toRead];
                    }
                    bytesReadSoFar = remainder;
                }
            }
        }
        finally{
            client.close();
        }
    }

    static final class Server extends Thread{

        public void run(){
            try {
                startServer();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        private static void startServer() throws IOException {
            final ServerSocket server = new ServerSocket(PORT);

            //System.out.println("Server listening on port "+PORT);

            while(true){
                final Socket c1 = server.accept();
                c1.setTcpNoDelay(true);
                //System.out.println("Client connected");
                new Thread(new Runnable() {

                    @Override
                    public void run() {
                        long totalMsgs = 0;
                        long counter = 0;
                        DataOutputStream serverout;
                        try {
                            serverout = new DataOutputStream(c1.getOutputStream());
                            for(int i=0;i<SENDCOUNT;i++){ 
                                serverout.writeLong(System.nanoTime());
                                serverout.writeLong(counter);
                                totalMsgs++;
                                counter++;
                            }
                            //System.out.println("Sent bytes to client: "+total);
                        } catch (IOException e) {
                            out.println("Messages sent:"+totalMsgs+", current counter:"+counter);
                            e.printStackTrace();
                        }
                        finally{
                            //System.out.println("Client disconnected when counter was "+counter);
                            try { c1.close(); } catch (IOException e) { e.printStackTrace();}
                        }
                    }
                }).start();
            }
        }
    }

}

编辑:由于对此有一些评论,实际程序有客户端通过输入流、缓冲流、NIO、NIO2 连接到服务器。这是该程序的更完整(但已过时)版本: https ://gist.github.com/falconair/4975243

我还没有添加数据输入流,尝试使用套接字选项等。我想在进一步移动之前解决数据损坏问题。

4

2 回答 2

1

该错误与您使用data[]internalBuffer[]所有数据洗牌有关。我没有看到真正的客户是使用类似这样的代码编写的。任何理智的人都会使用BufferedInputStream.

如果您想测试不同缓冲区大小的效果,请使用new DataInputStream(new BufferedInputStream(socket.getInputStream(), bufferSize))and并完全readLong()摆脱dataand :它们只会导致不相关的问题。internalBufferlongExtractor

以下作品完美无瑕:

private static void clientInputstream(int bufferSize) throws IOException, UnknownHostException
{
    long counter = 0;

    Socket client = new Socket(InetAddress.getLocalHost(), PORT);
    DataInputStream in = new DataInputStream(new BufferedInputStream(client.getInputStream(), bufferSize));

    try
    {
        for (;;)
        {
            long remoteTS = in.readLong();
            long remoteCounter = in.readLong();
            if (remoteCounter != counter)
            {
                String error = "ERROR: Expected remote counter to be " + counter + " but it was actually " + remoteCounter;
                //System.out.println(error);
                throw new RuntimeException(error);
            }
            counter++;

            long localTS = System.nanoTime();
            long latency = localTS - remoteTS;
            if (Math.abs(latency) > 1200000000)
            {
                out.println(String.format("ERROR: counter %s, remoteTS %s, localTS %s, diff %s",
                    counter, remoteTS, localTS, latency));
                continue;
            }
        }
    }
    catch (EOFException exc)
    {
        System.out.println("EOS");
    }
    finally
    {
        client.close();
    }
}
于 2013-02-20T23:07:28.943 回答
0

返回的值System.nanoTime()特定于正在运行的 JVM。您应该改用 System.currentTimeMillis() 。

此方法只能用于测量经过的时间,与系统或挂钟时间的任何其他概念无关。返回的值表示自某个固定但任意的原始时间以来的纳秒(可能在将来,因此值可能为负数)。在 Java 虚拟机实例中,此方法的所有调用都使用相同的来源;其他虚拟机实例可能使用不同的来源。

编辑:

由于您在同一个 JVM 中运行测试,因此 (this) 错误的来源必须与上述不同(尽管您应该考虑使用“currentTimeMillis”,以便在不同的 JVM 之间进行比较)。

我建议使用 BufferedInputStream 缓冲流,然后一次读取和处理 N(16?)个字节的块。

 Socket client = new Socket(InetAddress.getLocalHost(), PORT);
 InputStream in = new BufferedInputStream(client.getInputStream());

 int length = 16, offset=0;     
 while (length>0) {
   int read = in.read(data,offset,length);
   if (read<0) ... //connection error
   offset+=read;
   length-=read;
 }
于 2013-02-20T20:51:53.257 回答