我目前正在尝试实现一个与机器通信的应用程序,它基本上应该按如下方式工作:
- 程序向服务器发送一条消息(在本例中为文件的前 255 个字节)。
- 机器以“消息成功接收”或“错误接收消息”响应进行响应。
- 然后程序必须根据机器的响应决定是否发送下一条消息(下一个 255 字节)或不发送(最后一条消息出错,必须重新开始)。
- 对于程序需要发送的每条消息,依此类推(取决于文件的大小)。
因此,我们考虑让一个线程进行发送,另一个线程进行接收,因为我们有一个 api 来注册一个类作为从机器接收消息的类(仅通过实现一个接口),以及向机器发送消息不是阻塞类型的,因此需要一种等待机器响应的方法,以便程序可以决定在响应到达后做什么。
因此,我们需要以某种方式同步这两个线程,因为可以确定它们将交换多少消息,这使我们尝试使用 CyclicBarrier。这是测试 CyclicBarrier 是否可以帮助我们解决这个问题的代码(程序实际上并没有使用套接字与机器通信,这只是为了测试屏障):
import java.io.*;
import java.net.*;
import java.util.concurrent.*;
public class BlockingTest{
private CyclicBarrier barrier;
class Receiver implements Runnable{
@Override public void run(){
try{
ServerSocket ss = new ServerSocket(8080);
while(!barrier.isBroken()){
System.out.println("Waiting message...");
Socket response = ss.accept();
BufferedReader br = new BufferedReader(new InputStreamReader(
response.getInputStream()));
System.out.printf("Received: %s\n", br.readLine());
barrier.await();
}
}catch(InterruptedException | BrokenBarrierException |
IOException ex){
System.err.println(ex.getMessage());
}
}
}
public BlockingTest(){
this.barrier = new CyclicBarrier(2, new Runnable(){
@Override public void run(){
System.out.println("done.");
}
});
new Thread(new Receiver()).start();
try{
Socket sender = new Socket("localhost", 8080);
PrintWriter pw = new PrintWriter(sender.getOutputStream(), true);
for(int i = 0; i < 3; i++){
System.out.println("Sending message:");
pw.println("Message!");
this.barrier.await();
}
}catch(InterruptedException | BrokenBarrierException | IOException ex){
System.err.println(ex.getMessage());
}
}
public static void main(String[] arg){
new BlockingTest();
}
}
如果我们只发送一条消息(在 BlockingTest() 构造函数中没有 for 块,只发送消息),则此代码按预期工作,但在添加 for 块后,它不会按预期工作。它只在第一次工作,然后挂起:
Waiting message...
Sending message:
Received: Message!
done.
Waiting message...
Sending message:
问题是:
如何使屏障可重复使用?是自动的还是必须手动完成?
程序挂起是因为我错过了套接字(或屏障代码)吗?