1

我知道我可以简单地CountDownLatch直接使用,但是,作为练习并且为了Phaser更好地理解,我想使用它而不是COuntDownLatch.

因此,我将创建 N 个等待者,以及一个需要翻转闩锁的线程。所有等待者,如果在翻转之前到达,将阻塞,但是在锁存器倒计时之后,所有后续的await()立即返回。

Phaser不知道如何实现这一点......屏障很容易,因为我们有 N + 1 个线程,每个线程都在等待。然而,为了确保在第一阶段之后没有线程将等待,不知何故让我望而却步。

我能想出的唯一方法,不是很好,如下:

Phaser p = new Phaser(1);
int phase = p.getPhase();
....
// thread that awaits()
new Thread(new Runnable() {
    ....
    p.awaitAdvance(phase)
}

另一个线程只是将移相器推进到下一个阶段。这并不理想,因此任何指针都会受到赞赏。

4

2 回答 2

0

TL;DR:在这种情况下Phaser.arriveAndDeregister(),用于向移相器的等待者发出非阻塞信号,这对应于 operation CountDownLatch.countDown()

PhaserCountDownLatch首先要澄清的是 a通常Phaser 不能对 a 进行编码CountDownLatch。与 a 同步的任务Phaser必须相互等待(all-to-all 同步)。在 aCountDownLatch中,有一组任务等待其他任务打开闩锁。

PhaserCyclicBarrier这两种机制都用于全对全同步。它们之间的两个区别是: 1)Phaser使用移相器的任务集可能会在移相器的生命周期中增长,而CyclicBarrier参与者的数量是固定的;2) 使用Phasers,任务可以通知其他成员(参与者)并且只要它从该阶段取消注册就不会等待,而所有使用循环屏障的任务只能等待和通知。

用编码CountDownLatcha Phaser要使用移相器对 CountDownLatch(1) 进行编码,您需要记住以下几点:

  1. 聚会人数 = 服务员人数 + 1:已注册的聚会人数,无论是 vianew Phaser(PARTIES_COUNT)还是 via Phaser.register
  2. CountDown.await()=Phaser.arriveAndAwaitAdvance()
  3. CountDown.countDown()=Phaser.arriveAndDeregister()

例子。假设您希望子任务等待父任务信号。使用CountDownLatch你会写:

import java.util.concurrent.*;

class CountDownExample {
    public static void main(String[] args) throws Exception {
        CountDownLatch l = new CountDownLatch(1);
        new Thread(() -> {
            try {
                l.await();
                System.out.println("Child: running");
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }).start();
        System.out.println("Parent: waiting a bit.");
        Thread.sleep(100);
        l.countDown();
    }
}

使用 aPhaser你会写:

import java.util.concurrent.*;

class PhaserExample {
    public static void main(String[] args) throws Exception {
        Phaser ph = new Phaser(2); // 2 parties = 1 signaler and 1 waiter
        new Thread(() -> {
            ph.arriveAndAwaitAdvance();
            System.out.println("Child: running");
        }).start();
        System.out.println("Parent: waiting a bit.");
        Thread.sleep(100);
        ph.arriveAndDeregister();
    }
}

您可能想查看这篇文章以获取另一个示例

于 2017-11-22T21:33:05.310 回答
0

我不同意@tiago-cogumbreiro 的回答。事实上,Phaser可以用来模拟CountDownLatch.

arrive()您可以使用 Phaser 的方法来实现它。它不会等待其他线程到达。因此,CountDownLatch可以实现行为。

PhaserarriveAndAwaitAdvance()使调用它的线程等待,直到其他线程/方到达。因此,该方法可用于模拟CyclicBarrier.

下面是模拟CountDownLatch(3)使用行为的源代码Phaser。如果你只使用一个WorkerThread,你可以模拟CountDownLatch(1)

如果将arrive()方法替换为arriveAndAwaitAdvance()方法,则下面的代码将模拟CyclicBarrier.

public class PhaserAsCountDownLatch {

    static Phaser phaser = new Phaser(3);

    public static void main(String[] args) {
     
        new WorkerThread2().start();
        new WorkerThread3().start();
        new WorkerThread().start();
        
        //waits till phase 0 is completed and phase is advanced to next.
        phaser.awaitAdvance(0); 
        
        System.out.println("\nFROM MAIN THREAD: PHASE 0 COMPLETED");
        System.out.println("FROM MAIN THREAD: PHASE ADVANCED TO 1");
        System.out.println("MAIN THREAD ENDS HERE\n");
    }

    static class WorkerThread extends Thread{
        @Override
        public void run() {
            for (int i = 1; i < 5; i++) {
                System.out.println("tid: " + Thread.currentThread().getId() +  ", BEFORE ARRIVING val is: " + i);
            }
        
            phaser.arrive();
        
            System.out.println("ARRIVED tid: " + Thread.currentThread().getId());
        }
    }

    static class WorkerThread2 extends Thread{
        @Override
        public void run() {
        
            //won't wait for other threads to arrive. Hence, CountDownLatch behaviour can be achieved
            phaser.arrive();
        
            System.out.println("ARRIVED tid: " + Thread.currentThread().getId());
            for (int i = 200; i < 231; i++) {
                System.out.println("tid: " + Thread.currentThread().getId() +  " AFTER ARRIVING. val is: " + i);
            }
        }
    }

    static class WorkerThread3 extends Thread{
        @Override
        public void run() {
        
            //won't wait for other threads to arrive. Hence, CountDownLatch behaviour can be achieved
            phaser.arrive();
        
            System.out.println("ARRIVED tid: " + Thread.currentThread().getId());
            for (int i = 300; i < 331; i++) {
                System.out.println("tid: " + Thread.currentThread().getId() +  " AFTER ARRIVING. val is: " + i);
            }
        }
    }
}
于 2020-11-14T20:22:22.213 回答