23

A thread can use Object.wait() to block until another thread calls notify() or notifyAll() on that object.

But what if a thread wants to wait until one of multiple objects is signaled? For example, my thread must wait until either a) bytes become available to read from an InputStream or b) an item is added to an ArrayList.

How can the thread wait for either of these events to occur?

EDIT

This question deals with waiting for multiple threads to complete -- my case involves a thread waiting for one of many objects to be singnaled.

4

8 回答 8

24

你进入了一个痛苦的世界。使用更高级别的抽象,例如阻塞消息队列,线程可以从中使用消息,例如“更多可用字节”或“添加项目”。

于 2011-06-07T12:40:19.300 回答
6

他们都可以使用相同的互斥锁。您的消费者正在等待该互斥锁,当第一个可以继续时,其他两个都在该互斥锁上通知。

于 2011-06-07T12:39:35.603 回答
5

一个线程一次不能等待多个对象。

wait()notify()方法是特定于对象的。该wait()方法挂起当前的执行线程,并告诉对象跟踪挂起的线程。该notify()方法告诉对象唤醒它当前正在跟踪的挂起线程。

有用的链接:线程可以在 Java (6) 中一次对两个锁调用 wait() 吗?

于 2011-06-07T12:49:30.623 回答
4

有点晚了,但这是一个非常有趣的问题!看起来您确实可以等待多个条件,具有相同的性能,并且没有额外的线程;这只是定义问题的问题!我花时间在下面的代码提交中写了更详细的解释。根据要求,我将提取抽象:

所以实际上等待多个对象,与等待多个条件相同。但下一步是将您的子条件合并为-net-条件-单个条件-。当条件的任何组成部分使其变为真时,您翻转一个布尔值,并通知锁(就像任何其他等待通知条件一样)。

我的方法

对于任何条件,它只能产生两个值(真和假)。该价值如何产生无关紧要。在您的情况下,您的“功能条件”是两个值之一为真时:(value_a || value_b)。我将这种“功能条件”称为“Nexus-Point”。如果您应用任何复杂条件(无论多么复杂)总是产生简单结果(真或假)的观点,那么您真正要求的是“什么会导致我的净条件变为真?” (假设逻辑是“等到真”)。因此,当一个线程导致您的条件的一个组成部分变为真(在您的情况下将 value_a 或 value_b 设置为真),并且您知道它将导致您想要的 -net- 条件得到满足,然后你可以简化你对经典的方法(因为它翻转一个布尔标志,并释放一个锁)。有了这个概念,您可以应用对象坐标方法来帮助您的整体逻辑清晰:

import java.util.HashSet;
import java.util.Set;

/**
 * The concept is that all control flow operation converge
 * to a single value: true or false. In the case of N
 * components in which create the resulting value, the
 * theory is the same. So I believe this is a matter of
 * perspective and permitting 'simple complexity'. for example:
 *
 * given the statement:
 *      while(condition_a || condition_b || ...) { ... }
 *
 * you could think of it as:
 *      let C = the boolean -resulting- value of (condition_a || condition_b || ...),
 *      so C = (condition_a || condition_b || ...);
 *
 * Now if we were to we-write the statement, in lamest-terms:
 *      while(C) { ... }
 *
 * Now if you recognise this form, you'll notice its just the standard
 * syntax for any control-flow statement?
 *
 *      while(condition_is_not_met) {
 *          synchronized (lock_for_condition) {
 *              lock_for_condition.wait();
 *            }
 *      }
 *
 * So in theory, even if the said condition was evolved from some
 * complex form, it should be treated as nothing more then if it
 * was in the simplest form. So whenever a component of the condition,
 * in which cause the net-condition (resulting value of the complex
 * condition) to be met, you would simply flip the boolean and notify
 * a lock to un-park whoever is waiting on it. Just like any standard
 * fashion.
 *
 * So thinking ahead, if you were to think of your given condition as a
 * function whos result is true or false, and takes the parameters of the states
 * in which its comprised of (  f(...) = (state_a || state_b && state_c), for example )
 * then you would recognize "If I enter this state, in which this I know would
 * cause that condition/lock to become true, I should just flip the switch switch,
 * and notify".
 *
 * So in your example, your 'functional condition' is:
 *      while(!state_a && !state_b) {
 *          wait until state a or state b is false ....
 *      }
 *
 * So armed with this mindset, using a simple/assertive form,
 * you would recognize that the overall question:
 * -> What would cause my condition to be true? : if  state_a is true OR state_b is true
 * Ok... So, that means: When state_a or state_b turn true, my overall condition is met!
 * So... I can just simplify this thing:
 *
 *      boolean net_condition = ...
 *      final Object lock = new Lock();
 *
 *      void await() {
 *          synchronized(lock) {
 *              while(!net_condition) {
 *                  lock.wait();
 *              }
 *           }
 *       }
 *
 * Almighty, so whenever I turn state_a true, I should just flip and notify
 * the net_condition!
 *
 *
 *
 * Now for a more expanded form of the SAME THING, just more direct and clear:
 *
 * @author Jamie Meisch
 */
public class Main {


    /**
     *
     * The equivalent if one was to "Wait for one of many condition/lock to
     * be notify me when met" :
     *
     *      synchronized(lock_a,lock_b,lock_c) {
     *          while(!condition_a || !condition_b || !condition_c) {
     *              condition_a.wait();
     *              condition_b.wait();
     *              condition_c.wait();
     *          }
     *      }
     *
     */
    public static void main(String... args) {

        OrNexusLock lock = new OrNexusLock();
        // The workers register themselves as their own variable as part of the overall condition,
        // in which is defined by the OrNuxusLock custom-implement. Which will be true if any of
        // the given variables are true
        SpinningWarrior warrior_a = new SpinningWarrior(lock,1000,5);
        SpinningWarrior warrior_b = new SpinningWarrior(lock,1000,20);
        SpinningWarrior warrior_c = new SpinningWarrior(lock,1000,50);

        new Thread(warrior_a).start();
        new Thread(warrior_b).start();
        new Thread(warrior_c).start();

        // So... if any one of these guys reaches 1000, stop waiting:
        // ^ As defined by our implement within the OrNexusLock


        try {
            System.out.println("Waiting for one of these guys to be done, or two, or all! does not matter, whoever comes first");
            lock.await();
            System.out.println("WIN: " + warrior_a.value() + ":" + warrior_b.value() + ":" + warrior_c.value());
        } catch (InterruptedException ignored) {
        }

    }


    // For those not using Java 8 :)
    public interface Condition {
        boolean value();
    }

    /**
     * A variable in which the net locks 'condition function'
     * uses to determine its overall -net- state.
     */
    public static class Variable {

        private final Object lock;
        private final Condition con;

        private Variable(Object lock, Condition con) {
            this.lock = lock;
            this.con  = con;
        }

        public boolean value() {
            return con.value();
        }

        //When the value of the condition changes, this should be called
        public void valueChanged() {
            synchronized (lock) {
                lock.notifyAll();
            }
        }

    }



    /**
     *
     * The lock has a custom function in which it derives its resulting
     * -overall- state (met, or not met). The form of the function does
     * not matter, but it only has boolean variables to work from. The
     * conditions are in their abstract form (a boolean value, how ever
     * that sub-condition is met). It's important to retain the theory
     * that complex conditions yeild a simple result. So expressing a
     * complex statement such as ( field * 5 > 20 ) results in a simple
     * true or false value condition/variable is what this approach is
     * about. Also by centerializing the overal logic, its much more
     * clear then the raw -simplest- form (listed above), and just
     * as fast!
     */
    public static abstract class NexusLock {
        private final Object lock;

        public NexusLock() {
            lock = new Object();
        }

        //Any complex condition you can fathom!
        //Plus I prefer it be consolidated into a nexus point,
        // and not asserted by assertive wake-ups
        protected abstract boolean stateFunction();

        protected Variable newVariable(Condition condition) {
            return new Variable(lock, condition);
        }

        //Wait for the overall condition to be met
        public void await() throws InterruptedException {
            synchronized (lock) {
                while (!stateFunction()) {
                    lock.wait();
                }
            }
        }

    }

    // A implement in which any variable must be true
    public static class OrNexusLock extends NexusLock {


        private final Set<Variable> vars = new HashSet<>();

        public OrNexusLock() {
        }


        public Variable newVar(Condition con) {
            Variable var = newVariable(con);
            vars.add(var); //register it as a general component of or net condition       // We should notify the thread since our functional-condition has changed/evolved:
            synchronized (lock) { lock.notifyAll(); }
            return var;
        }

        @Override
        public boolean stateFunction() { //Our condition for this lock
            // if any variable is true: if(var_a || var_b || var_c || ...)

            for(Variable var : vars) {
                if(var.value() == true) return true;
            }
            return false;
        }

    }

    //increments a value with delay, the condition is met when the provided count is reached
    private static class SpinningWarrior implements Runnable, Condition {

        private final int count;
        private final long delay;
        private final Variable var;

        private int tick = 0;

        public SpinningWarrior(OrNexusLock lock, int count, long delay) {
            this.var   = lock.newVar(this);
            this.count = count; //What to count to?
            this.delay = delay;
        }

        @Override
        public void run() {
            while (state_value==false) { //We're still counting up!
                tick++;
                chkState();
                try {
                    Thread.sleep(delay);
                } catch (InterruptedException ignored) {
                    break;
                }
            }
        }

        /**
         * Though redundant value-change-notification are OK,
         * its best to prevent them. As such its made clear to
         * that we will ever change state once.
         */
        private boolean state_value = false;
        private void chkState() {
            if(state_value ==true) return;
            if(tick >= count) {
                state_value = true;
                var.valueChanged(); //Our value has changed
            }
        }

        @Override
        public boolean value() {
            return state_value; //We could compute our condition in here, but for example sake.
        }

    }


}
于 2015-07-22T16:34:45.030 回答
2

在这两种情况下锁定同一个对象。在情况 a) 或情况 b) 中调用同一对象上的 notify()。

于 2011-06-07T12:38:34.990 回答
2

您只能在一台显示器上等待。所以通知者必须通知这台监视器。在这种低级同步中没有其他方法。

于 2011-06-07T12:43:57.473 回答
2

在您的情况下,您似乎正在等待来自两个不同来源的“通知”。您可能不必synchronized(object) object.wait()在这两个对象本身上“等待”(如在普通 java 中),但让它们都与队列或其他对象交谈(正如其他答案所提到的,一些阻塞集合,如 LinkedBlockingQueue)。

如果您真的想在两个不同的 java 对象上“等待”,您可以通过应用此答案中的一些原则来做到这一点:https ://stackoverflow.com/a/31885029/32453 (基本上是新建一个线程each 对您正在等待的每个对象进行等待,让它们在通知对象本身时通知主线程)但管理同步方面可能并不容易。

于 2015-08-07T19:28:12.810 回答
0

为了处理给定集合中任何线程的终止而不等待所有线程完成,可以使用专用的公共对象(lastExited如下)作为监视器(wait()和块)notify()synchronized需要进一步的监视器来确保在任何时候最多有一个线程正在退出(notifyExitMutex),并且最多有一个线程正在等待任何线程退出(waitAnyExitMonitor);因此wait()/notify()对总是属于不同的块。

示例(所有进程终止都按照线程完成的顺序处理):

import java.util.Random;

public class ThreadMonitor {

    private final Runnable[] lastExited = { null };

    private final Object notifyExitMutex = new Object();
    public void startThread(final Runnable runnable) {
        (new Thread(new Runnable() { public void run() {
            try { runnable.run(); } catch (Throwable t) { }
            synchronized (notifyExitMutex) {
                synchronized (lastExited) {
                    while (true) {
                        try {
                            if (lastExited[0] != null) lastExited.wait();
                            lastExited[0] = runnable;
                            lastExited.notify();
                            return;
                        }
                        catch (InterruptedException e) { }
                    }
                }
            }
        }})).start();
    }

    private final Object waitAnyExitMutex = new Object();
    public Runnable waitAnyExit() throws InterruptedException {
        synchronized (waitAnyExitMutex) {
            synchronized (lastExited) {
                if (lastExited[0] == null) lastExited.wait();
                Runnable runnable = lastExited[0];
                lastExited[0] = null;
                lastExited.notify();
                return runnable;
            }
        }
    }

    private static Random random = new Random();
    public static void main(String[] args) throws InterruptedException {
        ThreadMonitor threadMonitor = new ThreadMonitor();

        int threadCount = 0;
        while (threadCount != 100) {
            Runnable runnable = new Runnable() { public void run() {
                try { Thread.sleep(1000 + random.nextInt(100)); }
                catch (InterruptedException e) { }
            }};
            threadMonitor.startThread(runnable);
            System.err.println(runnable + " started");
            threadCount++;
        }

        while (threadCount != 0) {
            Runnable runnable = threadMonitor.waitAnyExit();
            System.err.println(runnable + " exited");
            threadCount--;
        }
    }
}
于 2016-02-05T12:38:00.937 回答