在这个SO 答案中,Sorrow 声明如下:
我推荐使用与 Selector 和 SelectionKey 连接的 java.nio.channels.SocketChannel。这个解决方案有点基于事件,但比普通的套接字更复杂。
如果您决定采用该解决方案,您将在链接答案中找到代码示例。
但是,如果你在谈论java.net.Socket
那时,不,没有事件。我喜欢JTeagle对类似问题的回答:
这通常是通过为客户端生成一个单独的线程来完成的,该线程不断地从流中对 read() 进行阻塞调用——这样,一旦数据可用,read() 调用就会解除阻塞并可以对其接收到的内容进行操作(“事件触发'),然后它返回阻塞等待下一个事件。
而且根据我的经验,这主要是在 Java 中处理套接字的方式。我写了一个基于事件的套接字的实现。由于读取是可阻塞的,因此很可能不需要线程来阻止您的主程序:
public class ObservableSocket extends Thread {
private final Socket socket;
private final ArrayList<ObservableSocketListener> listeners;
private volatile boolean isReading;
private int BUFFER_SIZE = 1024;
public ObservableSocket(String host, int port) throws UnknownHostException, IOException {
this.socket = new Socket(host, port);
this.listeners = new ArrayList<ObservableSocketListener>(1);
isReading = true;
this.start();
}
public void addListener(ObservableSocketListener l) {
if (!listeners.contains(l)) {
listeners.add(l);
}
}
public void removeListener(ObservableSocketListener l) {
if (!listeners.contains(l)) {
listeners.remove(l);
}
}
public void die() {
isReading = false;
try {
this.join();
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
public void write(byte[] data) throws IOException {
socket.getOutputStream().write(data);
socket.getOutputStream().flush();
}
private byte[] getData(byte[] buffer, int red) {
byte[] redData = new byte[red];
System.arraycopy(buffer, 0, redData, 0, red);
return redData;
}
@Override
public void run() {
byte[] buffer = new byte[BUFFER_SIZE];
int red;
ObservableSocketEvent event;
try {
while (isReading && (red = socket.getInputStream().read(buffer)) > -1) {
event = new ObservableSocketEvent(this, getData(buffer, red));
for (ObservableSocketListener l : listeners) {
l.dataAvailable(event);
}
}
}
catch (Exception exception) {
event = new ObservableSocketEvent(this, exception);
for (ObservableSocketListener l : listeners) {
l.errorOccured(event);
}
}
finally {
if (socket != null) {
try {
socket.close();
for (ObservableSocketListener l : listeners) {
l.closed(new ObservableSocketEvent(this));
}
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
}
}
这是您需要实现的侦听器类:
public interface ObservableSocketListener extends EventListener {
public void dataAvailable(ObservableSocketEvent event);
public void errorOccured(ObservableSocketEvent event);
public void closed(ObservableSocketEvent event);
}
和事件类:
public class ObservableSocketEvent extends EventObject {
private final byte[] data;
private final Exception exception;
public ObservableSocketEvent(Object source) {
super(source);
this.data = null;
this.exception = null;
}
public ObservableSocketEvent(Object source, byte[] data) {
super(source);
this.data = data;
this.exception = null;
}
public ObservableSocketEvent(Object source, Exception exception) {
super(source);
this.data = null;
this.exception = exception;
}
public byte[] getData() {
return data;
}
public Exception getException() {
return exception;
}
}
我制作了一个服务器来生成一些随机数据来测试这段代码,这就是我在客户端的类主方法中使用它的方式:
ObservableSocket observableSocket = new ObservableSocket("localhost", 3339);
observableSocket.addListener(new ObservableSocketListener() {
@Override
public void dataAvailable(ObservableSocketEvent event) {
System.out.println("data received: "+new String(event.getData()));
}
@Override
public void closed(ObservableSocketEvent event) {
System.out.println("closing socket");
}
@Override
public void errorOccured(ObservableSocketEvent event) {
System.out.println("error occured");
event.getException().printStackTrace();
}
});
Thread.currentThread().sleep(10000);
observableSocket.die();
它输出:
data received: data 0
data received: data 1
data received: data 2
data received: data 3
data received: data 4
closing socket // thread is alive here
BUILD SUCCESSFUL (total time: 10 seconds) // thread dies here
就我的测试而言,sleep
需要 in 客户端,因为该die
方法:
没有睡眠,测试客户端立即完成(die 方法有效)。如果没有 die 方法,ObservableSocket 线程在测试结束后仍然存在。
使用此代码时,您应该注意两件事:
- 实例化后
ObservableSocket
立即Socket
连接并Thread
启动 a。
- 您必须
die
从不是ObservableSocket
线程的线程调用该方法(例如,不要从该类中调用该方法)