// multiple server instances (replicas), coordinated using MsgCoordinationService
public class Server {
ConcurrentHashMap<TxID,Future<Msg>> local_registry = new ...
MsgCoordinationService coordination_service = new ..
...
// Socket instance to communicate with a client...
public void accept(Socket s) {
new Thread(new Worker(s)).start();
}
// propose msg to coordination service, register demand to respond to client in local registry
public Future<Msg> register(Msg m) {
FutureMsg f = new MsgFuture(); // Future handle w. reference to an empty Msg object [= response]
TxID uniqueID = coordination_service.propose(s); // transaction ID
local_registry.add(uniqueID, f);
return f;
}
// called by coordination service, guaranteeing a global order on msg deliveries
public synchronized void deliver(TxID id, Msg m) {
... process Msg object [request]
... if local_registry.contains(id), 'compile' response
(using the Msg object from FutureMsg f, f.get() - f.isDone() when a certain Msg flag has been set)
___ now:
... notify waiting 'Worker' threads to check whether their 'Future' object isDone()
}
private class Worker implements Runnable {
...
public void run() {
...
Future<Msg> f = Server.this.register(request); // obtained through Socket s
while(!f.isDone())
wait();
response = f.get();
...
}
}
}
我正在实现一个复制服务[多个服务器,客户端通信 w。单个服务器实例,创建/更新/删除操作将通过协调服务分发,该协调服务保证消息传递的全局顺序]。
一旦客户端与服务器实例建立新连接,所有通信都通过专用Worker
实例进行引导[该实例在本地处理读取请求,并使用广播 C/U/D 操作Server.this.register(...)
]。
register
它本身基本上记录未来本地处理/回复的请求 - 并将Msg
对象转发给协调服务。
服务通过 , ... 重新传递Msg
对象deliver
,在处理完封装的任务后,Worker
将通知最初收到客户端请求的实例交出相应的响应。
由于某些原因,我的设计似乎被打破了...... - w/o synchronized(this)
[in Worker#run()
],wait()
不会等待;,synchronized(this)
a notifyAll()
inServer#deliver(...)
不会释放“阻塞”Worker
实例上的锁。
长话短说:事实证明,我需要你的帮助......要么(a):了解基础知识wait/notify/notifyAll
要么(b):改进我的设计......要么(c):(a)和(b )。