20

我需要直接等同于 的东西CountDownLatch,但可以重置(保持线程安全!)。我不能使用经典的同步结构,因为它们在这种情况下根本不起作用(复杂的锁定问题)。目前,我正在创建许多CountDownLatch对象,每个对象都替换了前一个对象。我相信这是在 GC 的年轻一代中发生的(由于对象的绝对数量)。您可以在下面看到使用锁存器的代码(它java.net是 ns-3 网络模拟器接口模拟的一部分)。

一些想法可能是尝试CyclicBarrier(JDK5+)或Phaser(JDK7)

我可以测试代码并回复找到解决此问题的任何人,因为我是唯一可以将其插入正在运行的系统中以查看会发生什么的人:)

/**
 *
 */
package kokunet;

import java.io.IOException;
import java.nio.channels.ClosedSelectorException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import kokuks.IConnectionSocket;
import kokuks.KKSAddress;
import kokuks.KKSSocket;
import kokuks.KKSSocketListener;

/**
 * KSelector
 * @version 1.0
 * @author Chris Dennett
 */
public class KSelector extends SelectorImpl {
    // True if this Selector has been closed
    private volatile boolean closed = false;

    // Lock for close and cleanup
    final class CloseLock {}
    private final Object closeLock = new CloseLock();

    private volatile boolean selecting = false;
    private volatile boolean wakeup = false;

    class SocketListener implements KKSSocketListener {
        protected volatile CountDownLatch latch = null;

        /**
         *
         */
        public SocketListener() {
            newLatch();
        }

        protected synchronized CountDownLatch newLatch() {
            return this.latch = new CountDownLatch(1);
        }

        protected synchronized void refreshReady(KKSSocket socket) {
            if (!selecting) return;

            synchronized (socketToChannel) {
                SelChImpl ch = socketToChannel.get(socket);
                if (ch == null) {
                    System.out.println("ks sendCB: channel not found for socket: " + socket);
                    return;
                }
                synchronized (channelToKey) {
                    SelectionKeyImpl sk = channelToKey.get(ch);
                    if (sk != null) {
                        if (handleSelect(sk)) {
                            latch.countDown();
                        }
                    }
                }
            }
        }
        @Override
        public void connectionSucceeded(KKSSocket socket) {
            refreshReady(socket);
        }
        @Override
        public void connectionFailed(KKSSocket socket) {
            refreshReady(socket);
        }
        @Override
        public void dataSent(KKSSocket socket, long bytesSent) {
            refreshReady(socket);
        }
        @Override
        public void sendCB(KKSSocket socket, long bytesAvailable) {
            refreshReady(socket);
        }
        @Override
        public void onRecv(KKSSocket socket) {
            refreshReady(socket);
        }
        @Override
        public void newConnectionCreated(KKSSocket socket, KKSSocket newSocket, KKSAddress remoteaddress) {
            refreshReady(socket);
        }
        @Override
        public void normalClose(KKSSocket socket) {
            wakeup();
        }
        @Override
        public void errorClose(KKSSocket socket) {
            wakeup();
        }
    }

    protected final Map<KKSSocket, SelChImpl>        socketToChannel = new HashMap<KKSSocket, SelChImpl>();
    protected final Map<SelChImpl, SelectionKeyImpl> channelToKey    = new HashMap<SelChImpl, SelectionKeyImpl>();
    protected final SocketListener currListener = new SocketListener();
    protected Thread selectingThread = null;

    SelChImpl getChannelForSocket(KKSSocket s) {
        synchronized (socketToChannel) {
            return socketToChannel.get(s);
        }
    }

    SelectionKeyImpl getSelKeyForChannel(KKSSocket s) {
        synchronized (channelToKey) {
            return channelToKey.get(s);
        }
    }

    protected boolean markRead(SelectionKeyImpl impl) {
        synchronized (impl) {
            if (!impl.isValid()) return false;
            impl.nioReadyOps(impl.readyOps() | SelectionKeyImpl.OP_READ);
            return selectedKeys.add(impl);
        }
    }

    protected boolean markWrite(SelectionKeyImpl impl) {
        synchronized (impl) {
            if (!impl.isValid()) return false;
            impl.nioReadyOps(impl.readyOps() | SelectionKeyImpl.OP_WRITE);
            return selectedKeys.add(impl);
        }
    }

    protected boolean markAccept(SelectionKeyImpl impl) {
        synchronized (impl) {
            if (!impl.isValid()) return false;
            impl.nioReadyOps(impl.readyOps() | SelectionKeyImpl.OP_ACCEPT);
            return selectedKeys.add(impl);
        }
    }

    protected boolean markConnect(SelectionKeyImpl impl) {
        synchronized (impl) {
            if (!impl.isValid()) return false;
            impl.nioReadyOps(impl.readyOps() | SelectionKeyImpl.OP_CONNECT);
            return selectedKeys.add(impl);
        }
    }

    /**
     * @param provider
     */
    protected KSelector(SelectorProvider provider) {
        super(provider);
    }

    /* (non-Javadoc)
     * @see kokunet.SelectorImpl#implClose()
     */
    @Override
    protected void implClose() throws IOException {
        provider().getApp().printMessage("implClose: closed: " + closed);
        synchronized (closeLock) {
            if (closed) return;
            closed = true;
            for (SelectionKey sk : keys) {
                provider().getApp().printMessage("dereg1");
                deregister((AbstractSelectionKey)sk);
                provider().getApp().printMessage("dereg2");
                SelectableChannel selch = sk.channel();
                if (!selch.isOpen() && !selch.isRegistered())
                    ((SelChImpl)selch).kill();
            }
            implCloseInterrupt();
        }
    }

    protected void implCloseInterrupt() {
        wakeup();
    }

    private boolean handleSelect(SelectionKey k) {
        synchronized (k) {
            boolean notify = false;

            if (!k.isValid()) {
                k.cancel();
                ((SelectionKeyImpl)k).channel.socket().removeListener(currListener);
                return false;
            }

            SelectionKeyImpl ski = (SelectionKeyImpl)k;

            if ((ski.interestOps() & SelectionKeyImpl.OP_READ) != 0) {
                if (ski.channel.socket().getRxAvailable() > 0) {
                    notify |= markRead(ski);
                }
            }

            if ((ski.interestOps() & SelectionKeyImpl.OP_WRITE) != 0) {
                if (ski.channel.socket().getTxAvailable() > 0) {
                    notify |= markWrite(ski);
                }
            }

            if ((ski.interestOps() & SelectionKeyImpl.OP_CONNECT) != 0) {
                if (!ski.channel.socket().isConnectionless()) {
                    IConnectionSocket cs = (IConnectionSocket)ski.channel.socket();
                    if (!ski.channel.socket().isAccepting() && !cs.isConnecting() && !cs.isConnected()) {
                        notify |= markConnect(ski);
                    }
                }
            }

            if ((ski.interestOps() & SelectionKeyImpl.OP_ACCEPT) != 0) {
                //provider().getApp().printMessage("accept check: ski: " + ski + ", connectionless: " + ski.channel.socket().isConnectionless() + ", listening: " + ski.channel.socket().isListening() + ", hasPendingConn: " + (ski.channel.socket().isConnectionless() ? "nope!" : ((IConnectionSocket)ski.channel.socket()).hasPendingConnections()));
                if (!ski.channel.socket().isConnectionless() && ski.channel.socket().isListening()) {
                    IConnectionSocket cs = (IConnectionSocket)ski.channel.socket();
                    if (cs.hasPendingConnections()) {
                        notify |= markAccept(ski);
                    }
                }
            }
            return notify;
        }
    }

    private boolean handleSelect() {
        boolean notify = false;

        // get initial status
        for (SelectionKey k : keys) {
            notify |= handleSelect(k);
        }

        return notify;
    }

    /* (non-Javadoc)
     * @see kokunet.SelectorImpl#doSelect(long)
     */
    @Override
    protected int doSelect(long timeout) throws IOException {
        processDeregisterQueue();

        long timestartedms = System.currentTimeMillis();

        synchronized (selectedKeys) {
            synchronized (currListener) {
                wakeup = false;
                selectingThread = Thread.currentThread();
                selecting = true;
            }
            try {
                handleSelect();

                if (!selectedKeys.isEmpty() || timeout == 0) {
                    return selectedKeys.size();
                }

                //TODO: useless op if we have keys available
                for (SelectionKey key : keys) {
                    ((SelectionKeyImpl)key).channel.socket().addListener(currListener);
                }
                try {
                    while (!wakeup && isOpen() && selectedKeys.isEmpty()) {
                        CountDownLatch latch = null;
                        synchronized (currListener) {
                            if (wakeup || !isOpen() || !selectedKeys.isEmpty()) {
                                break;
                            }
                            latch = currListener.newLatch();
                        }
                        try {
                            if (timeout > 0) {
                                long currtimems = System.currentTimeMillis();
                                long remainingMS = (timestartedms + timeout) - currtimems;

                                if (remainingMS > 0) {
                                    latch.await(remainingMS, TimeUnit.MILLISECONDS);
                                } else {
                                    break;
                                }
                            } else {
                                latch.await();
                            }
                        } catch (InterruptedException e) {

                        }
                    }
                    return selectedKeys.size();
                } finally {
                    for (SelectionKey key : keys) {
                        ((SelectionKeyImpl)key).channel.socket().removeListener(currListener);
                    }
                }
            } finally {
                synchronized (currListener) {
                    selecting = false;
                    selectingThread = null;
                    wakeup = false;
                }
            }
        }
    }

    /* (non-Javadoc)
     * @see kokunet.SelectorImpl#implRegister(kokunet.SelectionKeyImpl)
     */
    @Override
    protected void implRegister(SelectionKeyImpl ski) {
        synchronized (closeLock) {
            if (closed) throw new ClosedSelectorException();
            synchronized (channelToKey) {
                synchronized (socketToChannel) {
                    keys.add(ski);
                    socketToChannel.put(ski.channel.socket(), ski.channel);
                    channelToKey.put(ski.channel, ski);
                }
            }
        }

    }

    /* (non-Javadoc)
     * @see kokunet.SelectorImpl#implDereg(kokunet.SelectionKeyImpl)
     */
    @Override
    protected void implDereg(SelectionKeyImpl ski) throws IOException {
        synchronized (channelToKey) {
            synchronized (socketToChannel) {
                keys.remove(ski);
                socketToChannel.remove(ski.channel.socket());
                channelToKey.remove(ski.channel);

                SelectableChannel selch = ski.channel();

                if (!selch.isOpen() && !selch.isRegistered())
                    ((SelChImpl)selch).kill();
            }
        }
    }

    /* (non-Javadoc)
     * @see kokunet.SelectorImpl#wakeup()
     */
    @Override
    public Selector wakeup() {
        synchronized (currListener) {
            if (selecting) {
                wakeup = true;
                selecting = false;
                selectingThread.interrupt();
                selectingThread = null;
            }
        }
        return this;
    }
}

干杯,
克里斯

4

9 回答 9

23

我复制CountDownLatch并实现了一个reset()将内部Sync类重置为其初始状态(起始计数)的方法:) 似乎工作正常。不再创建不必要的对象 \o/ 因为sync是私有的,所以不能子类化。嘘。

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;

/**
 * A synchronization aid that allows one or more threads to wait until
 * a set of operations being performed in other threads completes.
 *
 * <p>A {@code CountDownLatch} is initialized with a given <em>count</em>.
 * The {@link #await await} methods block until the current count reaches
 * zero due to invocations of the {@link #countDown} method, after which
 * all waiting threads are released and any subsequent invocations of
 * {@link #await await} return immediately.  This is a one-shot phenomenon
 * -- the count cannot be reset.  If you need a version that resets the
 * count, consider using a {@link CyclicBarrier}.
 *
 * <p>A {@code CountDownLatch} is a versatile synchronization tool
 * and can be used for a number of purposes.  A
 * {@code CountDownLatch} initialized with a count of one serves as a
 * simple on/off latch, or gate: all threads invoking {@link #await await}
 * wait at the gate until it is opened by a thread invoking {@link
 * #countDown}.  A {@code CountDownLatch} initialized to <em>N</em>
 * can be used to make one thread wait until <em>N</em> threads have
 * completed some action, or some action has been completed N times.
 *
 * <p>A useful property of a {@code CountDownLatch} is that it
 * doesn't require that threads calling {@code countDown} wait for
 * the count to reach zero before proceeding, it simply prevents any
 * thread from proceeding past an {@link #await await} until all
 * threads could pass.
 *
 * <p><b>Sample usage:</b> Here is a pair of classes in which a group
 * of worker threads use two countdown latches:
 * <ul>
 * <li>The first is a start signal that prevents any worker from proceeding
 * until the driver is ready for them to proceed;
 * <li>The second is a completion signal that allows the driver to wait
 * until all workers have completed.
 * </ul>
 *
 * <pre>
 * class Driver { // ...
 *   void main() throws InterruptedException {
 *     CountDownLatch startSignal = new CountDownLatch(1);
 *     CountDownLatch doneSignal = new CountDownLatch(N);
 *
 *     for (int i = 0; i < N; ++i) // create and start threads
 *       new Thread(new Worker(startSignal, doneSignal)).start();
 *
 *     doSomethingElse();            // don't let run yet
 *     startSignal.countDown();      // let all threads proceed
 *     doSomethingElse();
 *     doneSignal.await();           // wait for all to finish
 *   }
 * }
 *
 * class Worker implements Runnable {
 *   private final CountDownLatch startSignal;
 *   private final CountDownLatch doneSignal;
 *   Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
 *      this.startSignal = startSignal;
 *      this.doneSignal = doneSignal;
 *   }
 *   public void run() {
 *      try {
 *        startSignal.await();
 *        doWork();
 *        doneSignal.countDown();
 *      } catch (InterruptedException ex) {} // return;
 *   }
 *
 *   void doWork() { ... }
 * }
 *
 * </pre>
 *
 * <p>Another typical usage would be to divide a problem into N parts,
 * describe each part with a Runnable that executes that portion and
 * counts down on the latch, and queue all the Runnables to an
 * Executor.  When all sub-parts are complete, the coordinating thread
 * will be able to pass through await. (When threads must repeatedly
 * count down in this way, instead use a {@link CyclicBarrier}.)
 *
 * <pre>
 * class Driver2 { // ...
 *   void main() throws InterruptedException {
 *     CountDownLatch doneSignal = new CountDownLatch(N);
 *     Executor e = ...
 *
 *     for (int i = 0; i < N; ++i) // create and start threads
 *       e.execute(new WorkerRunnable(doneSignal, i));
 *
 *     doneSignal.await();           // wait for all to finish
 *   }
 * }
 *
 * class WorkerRunnable implements Runnable {
 *   private final CountDownLatch doneSignal;
 *   private final int i;
 *   WorkerRunnable(CountDownLatch doneSignal, int i) {
 *      this.doneSignal = doneSignal;
 *      this.i = i;
 *   }
 *   public void run() {
 *      try {
 *        doWork(i);
 *        doneSignal.countDown();
 *      } catch (InterruptedException ex) {} // return;
 *   }
 *
 *   void doWork() { ... }
 * }
 *
 * </pre>
 *
 * <p>Memory consistency effects: Actions in a thread prior to calling
 * {@code countDown()}
 * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
 * actions following a successful return from a corresponding
 * {@code await()} in another thread.
 *
 * @since 1.5
 * @author Doug Lea
 */
public class ResettableCountDownLatch {
    /**
     * Synchronization control For CountDownLatch.
     * Uses AQS state to represent count.
     */
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        public final int startCount;

        Sync(int count) {
            this.startCount = count;
            setState(startCount);
        }

        int getCount() {
            return getState();
        }

        public int tryAcquireShared(int acquires) {
            return getState() == 0? 1 : -1;
        }

        public boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }

        public void reset() {
             setState(startCount);
        }
    }

    private final Sync sync;

    /**
     * Constructs a {@code CountDownLatch} initialized with the given count.
     *
     * @param count the number of times {@link #countDown} must be invoked
     *        before threads can pass through {@link #await}
     * @throws IllegalArgumentException if {@code count} is negative
     */
    public ResettableCountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

    /**
     * Causes the current thread to wait until the latch has counted down to
     * zero, unless the thread is {@linkplain Thread#interrupt interrupted}.
     *
     * <p>If the current count is zero then this method returns immediately.
     *
     * <p>If the current count is greater than zero then the current
     * thread becomes disabled for thread scheduling purposes and lies
     * dormant until one of two things happen:
     * <ul>
     * <li>The count reaches zero due to invocations of the
     * {@link #countDown} method; or
     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
     * the current thread.
     * </ul>
     *
     * <p>If the current thread:
     * <ul>
     * <li>has its interrupted status set on entry to this method; or
     * <li>is {@linkplain Thread#interrupt interrupted} while waiting,
     * </ul>
     * then {@link InterruptedException} is thrown and the current thread's
     * interrupted status is cleared.
     *
     * @throws InterruptedException if the current thread is interrupted
     *         while waiting
     */
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    public void reset() {
        sync.reset();
    }

    /**
     * Causes the current thread to wait until the latch has counted down to
     * zero, unless the thread is {@linkplain Thread#interrupt interrupted},
     * or the specified waiting time elapses.
     *
     * <p>If the current count is zero then this method returns immediately
     * with the value {@code true}.
     *
     * <p>If the current count is greater than zero then the current
     * thread becomes disabled for thread scheduling purposes and lies
     * dormant until one of three things happen:
     * <ul>
     * <li>The count reaches zero due to invocations of the
     * {@link #countDown} method; or
     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
     * the current thread; or
     * <li>The specified waiting time elapses.
     * </ul>
     *
     * <p>If the count reaches zero then the method returns with the
     * value {@code true}.
     *
     * <p>If the current thread:
     * <ul>
     * <li>has its interrupted status set on entry to this method; or
     * <li>is {@linkplain Thread#interrupt interrupted} while waiting,
     * </ul>
     * then {@link InterruptedException} is thrown and the current thread's
     * interrupted status is cleared.
     *
     * <p>If the specified waiting time elapses then the value {@code false}
     * is returned.  If the time is less than or equal to zero, the method
     * will not wait at all.
     *
     * @param timeout the maximum time to wait
     * @param unit the time unit of the {@code timeout} argument
     * @return {@code true} if the count reached zero and {@code false}
     *         if the waiting time elapsed before the count reached zero
     * @throws InterruptedException if the current thread is interrupted
     *         while waiting
     */
    public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

    /**
     * Decrements the count of the latch, releasing all waiting threads if
     * the count reaches zero.
     *
     * <p>If the current count is greater than zero then it is decremented.
     * If the new count is zero then all waiting threads are re-enabled for
     * thread scheduling purposes.
     *
     * <p>If the current count equals zero then nothing happens.
     */
    public void countDown() {
        sync.releaseShared(1);
    }

    /**
     * Returns the current count.
     *
     * <p>This method is typically used for debugging and testing purposes.
     *
     * @return the current count
     */
    public long getCount() {
        return sync.getCount();
    }

    /**
     * Returns a string identifying this latch, as well as its state.
     * The state, in brackets, includes the String {@code "Count ="}
     * followed by the current count.
     *
     * @return a string identifying this latch, as well as its state
     */
    public String toString() {
        return super.toString() + "[Count = " + sync.getCount() + "]";
    }
}
于 2011-07-06T13:11:21.083 回答
4

基于@Fidel -s 的回答,我做了一个替代 ResettableCountDownLatch。我所做的改变

  • mLatchprivate volatile
  • mInitialCountprivate final
  • simple 的返回类型await()已更改为 void。

否则,原始代码也很酷。因此,这是完整的增强代码:

public class ResettableCountDownLatch {

    private final int initialCount;
    private volatile CountDownLatch latch;

    public ResettableCountDownLatch(int  count) {
        initialCount = count;
        latch = new CountDownLatch(count);
    }

    public void reset() {
        latch = new CountDownLatch(initialCount);
    }

    public void countDown() {
        latch.countDown();
    }

    public void await() throws InterruptedException {
        latch.await();
    }

    public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
        return latch.await(timeout, unit);
    }
}

更新

基于@Systemplanet -s 评论,这里是一个更安全的版本reset()

    // An atomic reference is required because reset() is not that atomic anymore, not even with `volatile`.
    private final AtomicReference<CountDownLatch> latchHolder = new AtomicReference<>();

    public void reset() {
        // obtaining a local reference for modifying the required latch
        final CountDownLatch oldLatch = latchHolder.getAndSet(null);
        if (oldLatch != null) {
            // checking the count each time to prevent unnecessary countdowns due to parallel countdowns
            while (0L < oldLatch.getCount()) {
                oldLatch.countDown();
            }
        }
    }

基本上,这是简单和安全之间的选择。即,如果您愿意将责任转移给代码的客户端,那么将引用设置nullreset().

另一方面,如果你想让这段代码的用户更容易使用,那么你需要使用更多的技巧。

于 2016-10-27T15:12:32.177 回答
4

Phaser有更多选项,我们可以使用它来实现可重置的 countdownLatch。

请从以下网站阅读以下基本概念

https://examples.javacodegeeks.com/core-java/util/concurrent/phaser/java-util-concurrent-phaser-example/

http://netjs.blogspot.in/2016/01/phaser-in-java-concurrency.html

import java.util.concurrent.Phaser;
/**
 * Resettable countdownLatch using phaser
 */
public class PhaserExample {
    public static void main(String[] args) throws InterruptedException {
        Phaser phaser = new Phaser(3); // you can use constructor hint or
                                        // register() or mixture of both
        // register self... so parties are incremented to 4 (3+1) now
        phaser.register();
        //register is one time call for all the phases.
        //means no need to register for every phase             


        int phasecount = phaser.getPhase();
        System.out.println("Phasecount is " + phasecount);
        new PhaserExample().testPhaser(phaser, 2000);
        new PhaserExample().testPhaser(phaser, 4000);
        new PhaserExample().testPhaser(phaser, 6000);

        // similar to await() in countDownLatch/CyclicBarrier
        // parties are decremented to 3 (4+1) now
        phaser.arriveAndAwaitAdvance(); 
        // once all the thread arrived at same level, barrier opens
        System.out.println("Barrier has broken.");
        phasecount = phaser.getPhase();
        System.out.println("Phasecount is " + phasecount);

        //second phase
        new PhaserExample().testPhaser(phaser, 2000);
        new PhaserExample().testPhaser(phaser, 4000);
        new PhaserExample().testPhaser(phaser, 6000);
        phaser.arriveAndAwaitAdvance(); 
        // once all the thread arrived at same level, barrier opens
        System.out.println("Barrier has broken.");
        phasecount = phaser.getPhase();
        System.out.println("Phasecount is " + phasecount);

    }

    private void testPhaser(final Phaser phaser, final int sleepTime) {
        // phaser.register(); //Already constructor hint is given so not
        // required
        new Thread() {
            @Override
            public void run() {
                try {
                    Thread.sleep(sleepTime);
                    System.out.println(Thread.currentThread().getName() + " arrived");
                    // phaser.arrive(); //similar to CountDownLatch#countDown()
                    phaser.arriveAndAwaitAdvance();// thread will wait till Barrier opens
                    // arriveAndAwaitAdvance is similar to CyclicBarrier#await()
                }
                catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + " after passing barrier");
            }
        }.start();
    }
}
于 2017-04-28T15:09:38.353 回答
4

我不确定这是否存在致命缺陷,但我最近遇到了同样的问题,并通过在每次我想重置时简单地实例化一个新的 CountDownLatch 对象来解决它。像这样的东西:

服务员:

bla();
latch.await();
//now the latch has counted down to 0
blabla();

倒计时

foo();
latch.countDown();
//now the latch has counted down to 0
latch = new CountDownLatch(1);
Waiter.receiveReferenceToNewLatch(latch);
bar();

显然这是一个沉重的抽象,但到目前为止它对我有用,并且不需要你修改任何类定义。

于 2018-06-13T11:00:27.453 回答
1

使用移相器。

如果只有一个线程应该工作。你可以加入 AtomicBoolean 和 Phaser

 AtomicBoolean someConditionInProgress = new AtomicBoolean("false"); Phaser onConditionalPhaser = new Phaser(1);


   (some function) if (!someConditionInProgress.compareAndSet(false, true)) {
        try {
           //do something
        } finally {
            someConditionInProgress.set(false);
            //release barier
            onConditionalPhaser.arrive();
        }
    } else {
        onConditionalPhaser.awaitAdvance(onConditionalPhaser.getPhase());
    }
于 2021-10-07T08:41:41.663 回答
0

另一个插入式替代品

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public class ResettableCountDownLatch {
    int mInitialCount;
    CountDownLatch mLatch;

    public ResettableCountDownLatch(int  count) {
        mInitialCount = count;
        mLatch = new CountDownLatch(count);
    }

    public void reset() {
        mLatch = new CountDownLatch(mInitialCount);
    }

    public void countDown() {
        mLatch.countDown();
    }

    public boolean await() throws InterruptedException {
        boolean result = mLatch.await();
        return result;
    }

    public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
        boolean result = mLatch.await(timeout, unit);
        return result;
    }
}
于 2015-08-12T17:00:47.823 回答
0

看起来您想将异步 I/O 转换为同步。使用异步 I/O 的整个想法是避免使用线程,但 CountDownLatch 需要使用线程。这在你的问题中是一个明显的矛盾。这样你就可以:

  • 继续使用线程并使用同步 I/O 而不是选择器和其他东西。这将更加简单和可靠
  • 继续使用异步 I/0 并放弃 CountDownLatch。然后你需要一个异步库——看看 RxJava、Akka 或 df4j。
  • 继续开发您的项目以获得乐趣。然后您可以尝试使用 java.util.Semaphore 而不是 CountDownLatch,或者使用 synchronized/wait/notify 编写自己的同步类。
于 2018-08-16T11:37:24.640 回答
0
public class ResettableLatch {
private static final class Sync extends AbstractQueuedSynchronizer {

    Sync(int count) {
        setState(count);
    }

    int getCount() {
        return getState();
    }

    protected int tryAcquireShared(int acquires) {
        return getState() == 0 ? 1 : -1;
    }

    public void reset(int count) {
        setState(count);
    }

    protected boolean tryReleaseShared(int releases) {
        for (;;) {
            int c = getState();
            if (c == 0)
                return false;
            int nextc = c - 1;
            if (compareAndSetState(c, nextc))
                return nextc == 0;
        }
    }
}

private final Sync sync;

public ResettableLatch(int count) {
    if (count < 0)
        throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

public void countDown() {
    sync.releaseShared(1);
}

public long getCount() {
    return sync.getCount();
}

public void reset(int count) {
    sync.reset(count);
}
}

这对我有用。

于 2019-11-16T09:45:18.320 回答
0

根据我从 OP 解释和源代码中能够理解的内容,CountDownLatch对于他要解决的问题,resettable 的概念并不是很充分。CountDownLatch本身的文档提到了 OP 的用例作为简单的门初始化,计数为 1

CountDownLatch初始化为 1 用作简单的开/关锁存器或门:所有调用的线程都await在门处等待,直到它被调用的线程打开countDown

,但CountDownLatch实施并没有朝着这个方向走得更远。

因此,我自己遇到了类似于 OP 的问题,我决定引入一个SimpleGate具有以下属性的类:

  • 许可证数量为 1,这意味着它可以在OnOff状态;

  • 有一个专门的线程,叫Gate Keeper那个只允许shut off或者open upGate;

  • 门禁权可以转让;

  • 打开 Gate 立即允许尝试到come throughGate 的线程执行此操作(这个非常合乎逻辑的特征在其他答案中被忽略了);

  • 由于预计线程争用会很高,因此支持公平性作为选项,这可以减少线程插入的影响。

      public class SimpleGate {
    
          private static class Sync extends AbstractQueuedSynchronizer {
    
             // State
             private static final int SHUT = 1;
             private static final int OPEN = 0;
    
             private boolean fair;
    
             public void setFair(boolean fair) {
                this.fair = fair;
             }
    
             public void shutOff() {
                super.setState(SHUT);
             }
    
             @Override
             protected int tryAcquireShared(int arg) {
                if (fair && super.hasQueuedPredecessors())
                   return -1;
                return super.getState() == OPEN ? 1 : -1;
             }
    
             @Override
             protected boolean tryReleaseShared(int arg) {
                super.setState(OPEN); 
                return true;
             }
    
        }
    
        private Sync sync = new Sync();
        private volatile Thread gateKeeper = Thread.currentThread();
    
        public SimpleGate(){
           this(true);
        }
    
        public SimpleGate(boolean shutOff){
           this(shutOff, false);
        }
    
        public SimpleGate(boolean shutOff, boolean fair){
           if (shutOff)
              sync.shutOff();
           sync.setFair(fair);
        }
    
        public void comeThrough(){
           if (Thread.currentThread() == gateKeeper)
              throw new IllegalStateException("Gate Keeper thread is not supposed to come through the gate");
           sync.acquireShared(0);
        }
    
        public void shutOff(){
           if (Thread.currentThread() != gateKeeper)
              throw new IllegalStateException("Only a Gate Keeper thread is allowed to shut off");
            sync.shutOff();
        }
    
        public void openUp(){
           if (Thread.currentThread() != gateKeeper)
              throw new IllegalStateException("Only a Gate Keeper thread is allowed to open up");
           sync.releaseShared(0);
        }
    
        public void transferOwnership(Thread newGateKeeper){
           this.gateKeeper  = newGateKeeper;
        }
    
        // an addition of waiting interruptibly and waiting for specified amount of time, 
        //if they are needed, is trivial 
    }
    
于 2020-06-29T16:11:47.680 回答