1

我正在构建一个系统,其中调用线程的进度取决于两个变量的状态。一个变量由外部源(与客户端线程分开)偶尔更新,并且多个客户端线程在两个变量的条件下阻塞。系统是这样的

TypeB waitForB() { // Can be called by many threads.
    synchronized (B) {
        while (A <= B) { B.wait(); }
        A = B;
        return B;
    {
}

void updateB(TypeB newB) { // Called by one thread.
    synchronized (B) {
        B.update(newB);
        B.notifyAll(); // All blocked threads must receive new B.
    }
}

我需要所有被阻塞的线程在更新后接收 B 的新值。但问题是一旦单个线程完成并更新 A,等待条件再次变为真,因此其他一些线程被阻塞并且不会接收 B 的新值。有没有办法确保只有最后一个线程在 B 更新 A 上被阻止,还是以其他方式获得此行为?

4

3 回答 3

0

我的建议是使用基于事件的方法,线程想知道新B值只需注册更改!并且单线程只是调用(触发)它们。
像这样的东西。
首先声明事件标志。

interface EventListener{
void onUpdate(TypeB oldOne,TypeB newOne);
}

然后有一个作为监听器的实现。

class ManyThread implements EventListener,Runnable{
...
private TypeA a;
  synchronized void onUpdate(TypeB oldOne,TypeB newOne){
    if(!oldOne.equals(newOne)){a=newOne;this.notify();}
  }

  public ManyThread(){SingleThread.registerListener(this);}
  public synchronized void run(){
     this.wait();//waiting for an event!
     //some business
  }
...
}

然后提供事件发布者。

final class EventMgr{//would be as a singleton  guy too
  private EventMgr(){}
  static private java.util.List<EventListener> li=new java.util.ArrayList<EventListener>();
  static synchronized public void registerListener(EventListener e){li.add(e);}
  static synchronized void triggerListeners(TypeB oldOne,TypeB newOne){
    for(EventListener e:li){e.onUpdate(oldOne,newOne)}
  }
}

EventMgr和简单的触发听众

class SingleThread{
   TypeB oldOne,B;
   void updateB(TypeB newB) { // Called by one thread.
      synchronized (B) {
        oldOne=B.clone();
        B.update(newB);
        //B.notifyAll();
        EventMgr.triggerListeners(oldOne,B);
      }
   }
}
于 2013-11-09T07:31:21.850 回答
0

我有以下想法:为了保持线程计数器等待 B 的“好”值,首先唤醒它们将缓存该好值并让其他读者在那一刻阅读它。我们将新读者排除在等待循环之外,直到所有前一轮线程完成。

以下是代码大纲:

final AtomicInteger A = new AtomicInteger(-1), B = new AtomicInteger(-1);
int cachedB = -1;

int readersCount;

int waitForB() throws InterruptedException { // Can be called by many threads.
    synchronized (B) {
        while (cachedB != -1) B.wait();

        readersCount ++;

        while (A.get() <= B.get()) { B.wait(); }

        if (cachedB == -1) {
            cachedB = B.get();
            A.set(B.get());

            readersCount--;
            if (readersCount == 0) { cachedB = -1; B.notifyAll(); }

            return B.get();
        } else {
            int ret = cachedB;

            readersCount--;
            if (readersCount == 0) { cachedB = -1; B.notifyAll(); }

            return ret;
        }
    }
}

void updateB(int newB) { // Called by one thread.
    synchronized (B) {
        B.set(newB);
        B.notifyAll(); // All blocked threads must receive new B.
    }
}
于 2013-11-09T21:04:22.490 回答
0

我不确定这是否是 100% 线程安全的,但我还没有发现任何问题。这个想法是这样的:

CyclicBarrier barrier;
AtomicInteger count = 0;

TypeB waitForB() { // Can be called by many threads.
    synchronized (B) {
        count++;
        while (A <= B) { B.wait(); }
        count--;
    {
    if (barrier != null) { barrier.await(); }
    return B;
}

class UpdateA implements Runnable {
    void run() {
        A = B;
    }
}

void updateB(TypeB newB) { // Called by one thread.
    synchronized (B) {
        B.update(newB);
        barrier = new CyclicBarrier(count, new UpdateA);
        B.notifyAll(); // All blocked threads must receive new B.
    }
}
于 2013-11-10T23:02:58.230 回答