0
// multiple server instances (replicas), coordinated using MsgCoordinationService
public class Server {

    ConcurrentHashMap<TxID,Future<Msg>> local_registry = new ...
    MsgCoordinationService coordination_service = new .. 

    ...

    // Socket instance to communicate with a client...
    public void accept(Socket s) {
        new Thread(new Worker(s)).start();
    }

    // propose msg to coordination service, register demand to respond to client in local registry
    public Future<Msg> register(Msg m) {
        FutureMsg f = new MsgFuture(); // Future handle w. reference to an empty Msg object [= response]
        TxID uniqueID = coordination_service.propose(s); // transaction ID
        local_registry.add(uniqueID, f);
        return f;
    }

    // called by coordination service, guaranteeing a global order on msg deliveries
    public synchronized void deliver(TxID id, Msg m) {
        ... process Msg object [request]
        ... if local_registry.contains(id), 'compile' response
            (using the Msg object from FutureMsg f, f.get() - f.isDone() when a certain Msg flag has been set)

        ___ now:
        ... notify waiting 'Worker' threads to check whether their 'Future' object isDone()
    }

    private class Worker implements Runnable {
        ...
        public void run() {
            ...
            Future<Msg> f = Server.this.register(request); // obtained through Socket s

            while(!f.isDone())
                wait();

            response = f.get();
            ...
        }
    }
}

我正在实现一个复制服务[多个服务器,客户端通信 w。单个服务器实例,创建/更新/删除操作将通过协调服务分发,该协调服务保证消息传递的全局顺序]。

一旦客户端与服务器实例建立新连接,所有通信都通过专用Worker实例进行引导[该实例在本地处理读取请求,并使用广播 C/U/D 操作Server.this.register(...)]。

register它本身基本上记录未来本地处理/回复的请求 - 并将Msg对象转发给协调服务。

服务通过 , ... 重新传递Msg对象deliver,在处理完封装的任务后,Worker将通知最初收到客户端请求的实例交出相应的响应。

由于某些原因,我的设计似乎被打破了...... - w/o synchronized(this)[in Worker#run()],wait()不会等待;,synchronized(this)a notifyAll()inServer#deliver(...)不会释放“阻塞”Worker实例上的锁。

长话短说:事实证明,我需要你的帮助......要么(a):了解基础知识wait/notify/notifyAll要么(b):改进我的设计......要么(c):(a)(b )。

4

1 回答 1

2

线程调用wait/notify需要锁定调用这些方法的对象,否则会出现异常。在一般形式中,假设一个任意对象:

final Object obj = new Object();
...

synchronized(obj) {
    while(/* condition */) {
       obj.wait();
    }
}

wait你没有被释放的原因notify是你this.wait()在一个与你调用的对象不同的对象内部进行操作notify,因此它们没有配对。您需要对两个调用使用相同的实例。您this.wait()在 的实例中Worker执行和this.notifyAll()在 的实例中执行Server,因此this不引用同一个对象。

您应该创建一个跨类和线程可见的同步对象并在该对象上进行同步。

于 2012-07-17T19:16:38.887 回答