0

当使用BufferedReader从套接字读取时, 它声明该readLine()方法返回

包含行内容的字符串,不包括任何行终止字符,如果已到达流的末尾,则为 null

它怎么知道它已经到达了流的末端?它使用什么字符序列来确定这一点。

我想模拟发送相同的字符序列以正确关闭另一个使用 PipedStreams 的连接。


编辑: 这是有问题的代码。从响应来看,似乎没有这样的序列,并且在 PipedOutput 流上调用 close() 应该会解除对输出流上的 readLine() 的阻塞。目前似乎没有这样做,这就是我感到困惑的原因,所以我认为这可能是其他地方的错误。

发生的事情是incomingEventIn.close()线路在阻塞时似乎inputLine = incomingEventIn.readLine()正在阻塞。如果inputLine = incomingEventIn.readLine()没有在另一个线程上执行,则incomingEventIn.close()执行正常。为什么会这样?

public class SocketManager {

    private Socket socket = null;
    private PrintWriter out = null;
    private BufferedReader in = null;

    private PipedOutputStream incomingEventOutStream = null;
    private PrintWriter incomingEventOut = null;
    private BufferedReader incomingEventIn = null;
    private PipedOutputStream incomingResponsOutStream = null;
    private PrintWriter incomingResponseOut = null;
    private BufferedReader incomingResponseIn = null;

    private ArrayList<AsteriskLiveComsEventListener> listeners = new ArrayList<AsteriskLiveComsEventListener>();
    private final ExecutorService eventsDispatcherExecutor;

    private String ip;
    private int port;

    private Object socketLock = new Object();

    public SocketManager(String ip, int port) {
        this.ip = ip;
        this.port = port;
        eventsDispatcherExecutor = Executors.newSingleThreadExecutor();
    }

    public void connect() throws UnableToConnectException, AlreadyConnectedException {
        synchronized(socketLock) {
            if (socket != null && !socket.isClosed()) {
                throw (new AlreadyConnectedException());
            }
            try {
                socket = new Socket(ip, port);
                out = new PrintWriter(socket.getOutputStream(), true);
                in = new BufferedReader(new InputStreamReader(socket.getInputStream()));

                incomingEventOutStream = new PipedOutputStream();
                incomingEventIn = new BufferedReader(new InputStreamReader(new PipedInputStream(incomingEventOutStream)));
                incomingEventOut = new PrintWriter(incomingEventOutStream);

                incomingResponsOutStream = new PipedOutputStream();
                incomingResponseIn = new BufferedReader(new InputStreamReader(new PipedInputStream(incomingResponsOutStream)));
                incomingResponseOut = new PrintWriter(incomingResponsOutStream);

            } catch (IOException e) {
                throw (new UnableToConnectException());
            }
            new Thread(new IncomingEventThread()).start();
            new Thread(new SocketThread()).start();
        }
    }

    public void disconnect() throws NotConnectedException {
        disconnect(false);
    }

    private void disconnect(boolean notRequested) throws NotConnectedException {
        synchronized(socketLock) {
            if (!isConnected()) {
                throw (new NotConnectedException());
            }

            try {
                incomingEventIn.close();
            } catch (IOException e2) {}
            // IT NEVER GETS TO HERE!
            incomingEventOut.close();
            try {
                incomingResponseIn.close();
            } catch (IOException e1) {}
            System.out.println("disconnecting");
            incomingResponseOut.close();
            try {
                socket.shutdownInput();
            } catch (IOException e) {}
            try {
                socket.shutdownOutput();
            } catch (IOException e) {}
            try {
                socket.close();
            } catch (IOException e) {}

            if (notRequested) {

                System.out.println("disconnecting event");
                dispatchEvent(new ConnectionLostEvent());
            }
        }
    }

    public boolean isConnected() {
        synchronized(socketLock) {
            return (socket != null && !socket.isClosed());
        }
    }

    public void addEventListener(AsteriskLiveComsEventListener a) {
        synchronized(listeners) {
            listeners.add(a);
        }
    }

    public void removeEventListener(AsteriskLiveComsEventListener a) {
        synchronized(listeners) {
            listeners.remove(a);
        }
    }

    private void dispatchEvent(final AsteriskLiveComsEvent e) {
        synchronized (listeners) {
            synchronized (eventsDispatcherExecutor) {
                eventsDispatcherExecutor.execute(new Runnable()
                {
                    public void run()
                    {
                        for(int i=0; i<listeners.size(); i++) {
                            listeners.get(i).onAsteriskLiveComsEvent(e);
                        }
                    }
                });
            }
        }
    }

    public JSONObject sendRequest(JSONObject request) throws JSONException, NotConnectedException {
        synchronized(socketLock) {
            System.out.println("sending request "+request.toString());
            out.println(request.toString());
            try {
                return new JSONObject(incomingResponseIn.readLine());
            } catch (IOException e) {
                // lets close the connection
                try {
                    disconnect(true);
                } catch (NotConnectedException e1) {}
                throw(new NotConnectedException());
            }
        }
    }

private class SocketThread implements Runnable {

    @Override
    public void run() {
        String inputLine = null;
        try {
            while((inputLine = in.readLine()) != null) {
                // determine if this is a response or event and send to necessary location
                JSONObject lineJSON = new JSONObject(inputLine);
                if (lineJSON.getString("type").equals("response")) {
                    incomingResponseOut.println(inputLine);
                    incomingResponseOut.flush();
                }
                else if (lineJSON.getString("type").equals("event")) {
                    incomingEventOut.println(inputLine);
                    incomingEventOut.flush();
                }
            }

            if (isConnected()) {
                try {
                    disconnect(true);
                } catch (NotConnectedException e) {}
            }
        } catch (IOException e) {
            // try and disconnect (if not already disconnected) and end thread
            if (isConnected()) {
                try {
                    disconnect(true);
                } catch (NotConnectedException e1) {}
            }
        }
    }

}

private class IncomingEventThread implements Runnable {

    @Override
    public void run() {
        String inputLine = null;
        try {
            while((inputLine = incomingEventIn.readLine()) != null) {
                JSONObject lineJSON = new JSONObject(inputLine);
                String eventType = lineJSON.getString("eventType");
                // determine what type of event it is and then fire one that represents it
                if (eventType.equals("channelAdded")) {
                    JSONObject a = lineJSON.getJSONObject("payload");
                    Hashtable<String,Object> data = new Hashtable<String,Object>();
                    Object[] keys = a.keySet().toArray();
                    for(int i=0; i<keys.length; i++) {
                        data.put((String) keys[i], a.get((String) keys[i]));
                    }
                    dispatchEvent(new ChannelAddedEvent(data));
                }
                else if (eventType.equals("channelRemoved")) {
                    dispatchEvent(new ChannelRemovedEvent(lineJSON.getJSONObject("payload").getInt("channelId")));
                }
                else if (eventType.equals("channelsToRoom")) {
                    ArrayList<Integer> data = new ArrayList<Integer>();
                    JSONObject a = lineJSON.getJSONObject("payload");
                    JSONArray ids = a.getJSONArray("channelIds");
                    for(int i=0; i<ids.length(); i++) {
                        data.add(ids.getInt(i));
                    }
                    dispatchEvent(new ChannelsToRoomEvent(data));
                }
                else if (eventType.equals("channelToHolding")) {
                    dispatchEvent(new ChannelToHoldingEvent(lineJSON.getJSONObject("payload").getInt("channelId")));
                }
                else if (eventType.equals("channelVerified")) {
                    dispatchEvent(new ChannelVerifiedEvent(lineJSON.getJSONObject("payload").getInt("channelId")));
                }
                else if (eventType.equals("serverResetting")) {
                    dispatchEvent(new ServerResettingEvent());
                }
            }
        } catch (IOException e) {}
        System.out.println("here");
    }

}

编辑2: 我认为这是某个地方的死锁问题,因为如果我在调试器中放置一些断点,它运行良好并inputLine = incomingEventIn.readLine()返回null。如果我尝试正常运行它,它会锁定。

编辑3:感谢格雷的回答解决了。输入流在导致锁定的输出之前被关闭。它需要反过来。首先关闭输出流,然后通知输入流该流已关闭并解除阻塞该readLine()方法。

4

4 回答 4

3

它怎么知道它已经到达了流的末端?它使用什么字符序列来确定这一点。

答案取决于操作系统,但我熟悉的操作系统没有读取 EOF 字符。操作系统将指示流(文件描述符)已达到 EOF 的返回值返回给底层调用者。JVM 看到返回值并根据方法向or调用者返回适当的返回 ( null, -1, ...) 。InputStreamReader

我想模拟发送相同的字符序列以正确关闭另一个使用 PipedStreams 的连接。

如果您正在从 a 读取,PipedReader则关闭关联的PipedWriter. Readeror然后InputStream将适当的 EOF 值返回给调用者。

编辑:

由于您IncomingEventThread正在阅读incomingEventIn,该disconnect()方法应该关闭第incomingEventOut一个。线程应该关闭内部本身。然后你应该关闭响应。

不会有线程调用disconnect(...)。它应该只关闭它的读取器和写入器,而不是所有的流。

于 2013-07-22T21:49:08.840 回答
2

看看这个问题: 文件流文件结尾的字符是什么?

于 2013-07-22T21:45:38.493 回答
2

从您的角度来看,只需调用您用来连接到测试close的那个。PipedOutputStream

实际close的套接字由客户端和服务器上的 TCP 堆栈执行。

这应该可以(请注意,您不能在同一个线程上读取/写入管道流,因此有 2 个方法和一个线程创建):

void runTest ( final PipedInputStream sink ) throws Exception
{
    try( final PipedOutputStream stream = new PipedOutputStream( sink ) )
    {
        try ( final OutputStreamWriter swriter = 
              new OutputStreamWriter( stream, "UTF-8" )
        )
        {
            try ( final PrintWriter writer = new PrintWriter( swriter ) )
            {
                writer.println( "Hello" );

                writer.println( "World!" );
            }
        }
    }
}

void test ( final PipedInputStream sink ) throws InterruptedException
{
    final Thread outputThread =
        new Thread(
            new Runnable ( )
            {
                @Override
                public void run ( )
                {
                    try
                    {
                        runTest( sink );
                    }
                    catch ( final Exception ex )
                    {
                        throw new RuntimeException( ex );
                    }
                }

            }
        );

    outputThread.start( );

    outputThread.join( );
}
于 2013-07-22T21:50:18.020 回答
2

没有一个。操作系统通过文件大小、TCP FIN 位或其他取决于源的带外机制知道流何时到达其末尾。我知道的唯一例外是终端驱动程序在通过键盘键入时将 Ctrl/d 或 Ctrl/z 识别为 EOF,但同样是操作系统,而不是 Java 流或阅读器。

于 2013-07-22T22:04:00.590 回答