在浏览了有关我的问题的其他一些线程后,我想我已经明白我需要重新设计我的应用程序。但只是为了澄清:我TCP/IP在客户端和服务器之间有一个连接。在客户端,有许多线程同时运行。这些线程中的一个或多个随机使用该TCP/IP连接与服务器进行通信。我发现,例如,当长时间运行的文件传输处于活动状态时,同时使用与另一个线程的连接可能会导致错误。虽然我在每条消息之前都有一个特定的标题,包括数据长度,但在我看来,IPstack 有时会向我的程序传递多条消息的混合,这意味着尽管一条消息尚未完全传递,但另一条消息的一部分将传递给我的 read 方法。这是符合预期TCP/IP行为的正确观察吗?提前致谢 - 马里奥

++++++++++++++++++++++++++++++++++++++++++++++++++++++ ++++++++++++++++++++++++++++++

对于任何感兴趣的人:以下是我的测试程序的源代码。您可以使用不同的 BUFFER_SIZE 值和用于轰炸服务器套接字的线程数,并发 TCP/IP 发送使用相同的套接字。我省略了一些错误处理并删除了更复杂的终止,包括关闭套接字。使用大于 64KB 的 BUFFER_SIZE 进行测试总是会导致我的机器出错。

import java.io.*;
import java.net.*;
import java.nio.ByteBuffer;

public class TCPTest
  private final static String INPUT_FILE   = "c:/temp/tcptest.in";
  private final static int    BUFFER_SIZE  = 64 * 1024 - 8; //65536;
  private final static int    MESSAGE_SIZE = 512 * 64 * 1024;
  private final static int    THREADS      = 3;
  private final static int    SIZE_OF_INT  = 4;
  private final static int    LENGTH_SIZE  = SIZE_OF_INT;
  private final static int    ID_SIZE      = SIZE_OF_INT;
  private final static int    HEADER_SIZE  = LENGTH_SIZE + ID_SIZE;
  private final static String NEW_LINE     = System.getProperty("line.separator");

  private ServerSocket m_serverSocket = null;
  private Socket       m_clientSocket = null;
  private int          m_iThreadCounter;

  public static void main(String[] args)  
    new TCPTest();
  } // main

  public TCPTest() 
    final String id = "ReaderThread[*]";
    // start a new thread creating a server socket waiting for connections 
    new Thread(new Runnable() 
      public void run() 
          // create server socket and accept client requests
          m_serverSocket = new ServerSocket(12345);
          m_clientSocket = m_serverSocket.accept();
          // client request => prepare and read data
          long       startTime       = System.currentTimeMillis();
          byte[]     buffer          = new byte[BUFFER_SIZE];
          ByteBuffer header          = ByteBuffer.allocate(HEADER_SIZE);
          int        iTotalBytesRead = 0;
          boolean    fTerminate      = false;
          int        iBytesRead;
          // get hold of socket's input stream
          InputStream clientInputStream = m_clientSocket.getInputStream();
          // loop
          while (false == fTerminate)
            // loop to read next header
            for (int i = 0; i < HEADER_SIZE; i++)
              clientInputStream.read(header.array(), i, 1);
            // get information of interest
            int iLength      = header.getInt();
            int iId          = header.getInt();
            int iLengthSoFar = 0;
            int iBytesLeft   = iLength;
            int iBytesToRead;
            // any length given?
            if ((0 < iLength) && (BUFFER_SIZE >= iLength))
              // that's the case => read complete message
              while (iLengthSoFar < iLength)
                // calculate number of bytes left
                iBytesLeft = iLength - iLengthSoFar;
                 // calculate maximum number of bytes to read
                if (iBytesLeft > BUFFER_SIZE)
                  iBytesToRead = BUFFER_SIZE;
                  iBytesToRead = iBytesLeft;
                // read next portion of bytes
                if ((iBytesRead = clientInputStream.read(buffer, 0, iBytesToRead)) != -1)
                  // maintain statistics
                  iTotalBytesRead += iBytesRead;
                  iLengthSoFar += iBytesRead;
                } // if
                  // finish => print message
                  System.out.println("==> "+id+": ERROR length=<-1> received " +
                        "for id=<"+iId+">");
                  fTerminate = true;
                } // else
              } // while
            } // if
              System.out.println("==> "+id+": ERROR data length <= 0 for id=<"+iId+">");
              dump(header, 0, HEADER_SIZE / SIZE_OF_INT, "Error header");
            } // else
          } // while
          System.out.println("==> "+id+": "+ iTotalBytesRead + " bytes read in " 
                              + (System.currentTimeMillis() - startTime) + " ms.");
        } // try 
        catch (IOException e) 
        } // catch
      } // run
    // create the socket writer threads
      // ensure server is brought up and request a connection
      System.out.println("==> "+id+": just awoke");
      Socket       socket             = new Socket("localhost", 12345);
      OutputStream socketOutputStream = socket.getOutputStream();
      System.out.println("==> "+id+": socket obtained");
      // create some writer threads
      for (int i = 0; i < THREADS; i++)
        // create a new socket writer and start the thread
        (new SocketWriter(socket, 
                          new String("WriterThread["+(i+1)+"]"),
    } // try
    catch (Exception e)
    } // catch
  } // TCPTestEx

  private final static void dump(ByteBuffer bb, int iOffset, int iInts, String header)
    for (int i = 0; i < iInts; i++)
      System.out.print(" " + Integer.toHexString(bb.getInt()).toUpperCase());
  } // dump

  private class SocketWriter extends Thread
    Socket       m_socket;
    int          m_iId;
    int          m_iBufferSize;
    String       m_id;
    OutputStream m_os;

    protected SocketWriter(Socket socket, int iId, int iBufferSize, String id, OutputStream os)
      m_socket       = socket;
      m_iId          = iId;
      m_iBufferSize  = iBufferSize;
      m_id           = id;
      m_os           = os;
      // increment thread counter
      synchronized (m_serverSocket)
      } // synchronized
    } // SocketWriter

    public final void run()
        long       startTime        = System.currentTimeMillis();
        ByteBuffer buffer           = ByteBuffer.allocate(m_iBufferSize + HEADER_SIZE); 
        int        iTotalBytesRead  = 0;
        int        iNextMessageSize = 512 * m_iBufferSize; 
        int        iBytesRead;
        // open input stream for file to read and send
        FileInputStream fileInputStream = new FileInputStream(INPUT_FILE);
        System.out.println("==> "+m_id+": file input stream obtained");
        // loop to read complete file
        while (-1 != (iBytesRead = fileInputStream.read(buffer.array(), HEADER_SIZE, m_iBufferSize))) 
          // add length and id to buffer and write over TCP
          buffer.putInt(0, iBytesRead);
          buffer.putInt(LENGTH_SIZE, m_iId);
          m_os.write(buffer.array(), 0, HEADER_SIZE + iBytesRead);
          // maintain statistics and print message if so desired
          iTotalBytesRead += iBytesRead;
          if (iNextMessageSize <= iTotalBytesRead)
            System.out.println("==> "+m_id+": <"+iTotalBytesRead+"> bytes processed");
            iNextMessageSize += MESSAGE_SIZE;
          } // if
        } // while
        // close my file input stream
        System.out.println("==> "+m_id+": file input stream closed");
        System.out.println("==> "+m_id+": <"+ iTotalBytesRead + "> bytes written in " 
                            + (System.currentTimeMillis() - startTime) + " ms.");
        // decrement thread counter
        synchronized (m_serverSocket)
          // last thread?
          if (0 >= m_iThreadCounter)
            // that's the case => terminate
        } // synchronized
      } // try 
      catch (Exception e) 
      } // catch
    } // run
  } // SocketWriter
} // TCPTest

1 回答 1


耶。TCP 是一种面向字节的流协议。这意味着应用程序接收到一个(未定界的)字节流。“消息”的概念应该由应用程序提供(或使用面向消息的协议)。

于 2013-02-18T15:36:21.897 回答