1

我正在尝试将一些商业案例映射到循环障碍的使用。假设我们正在进行促销优惠,只有 3 位客户可以获得促销优惠。其余的将无法获得报价。

为了映射这个场景,我使用了 Cyclic Barrier。即使代码正常工作,我也不确定如何处理某些客户无法获得报价的情况。现在,我尝试使用具有超时值的 await() API,以便我可以捕获TimeoutException并让客户知道他无法使用促销优惠。这导致了另一个等待线程的BarrierBrokenException 。

我想知道,我们如何才能优雅地处理这些场景,以便选定的客户获得促销优惠,而那些无法遵循不同代码路径的客户。

我的代码 -

public class CyclicBarrierExample {

 public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
    Thread[] threads = new Thread[5];
    CyclicBarrier barrier = new CyclicBarrier(3, ()->{System.out.println("Barrier limit of 3 reached. 3 threads will get the promotional offer!");});
    Runnable nr = new PromotionRunnable(barrier);

    int i = 0;
    for (Thread t : threads) {
        t = new Thread(nr, "Thread " + ++i);
        t.start();
    }
    System.out.println("main thread has completed");
 }

 private static class PromotionRunnable implements Runnable {
    private final CyclicBarrier barrier;

    public PromotionRunnable(final CyclicBarrier barrier) {
        this.barrier = barrier;
    }

    /*
     * As per the doc, BrokenBarrierException is thrown when another thread timed out while the current thread was waiting.
     * This explains why we are able to see both Timeout and Broken Barrier Exceptions.
     */
    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + " trying to get the promotional offer!");
        try {
            barrier.await(2000L, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
            return;
        } catch (BrokenBarrierException e) {
            System.out.println(Thread.currentThread().getName() + " could not get the promotional offer, due to barrier exception");
            return;
        } catch (TimeoutException e) {
            System.out.println(Thread.currentThread().getName() + " could not get the promotional offer, due to timeout exception");
            return;
        }
        System.out.println(Thread.currentThread().getName() + " got the promotional offer!");
    }
 }
}

其中一次运行的输出 -

  • 线程 1 试图获得促销优惠!
  • 线程 4 试图获得促销优惠!
  • 主线程已完成
  • 线程 3 试图获得促销优惠!
  • 线程 2 试图获得促销优惠!
  • 线程 5 试图获得促销优惠!
  • 障碍达到前三名,他们将获得促销优惠!
  • 线程 2 获得促销优惠!
  • Thread 1 收到促销优惠!
  • Thread 5 收到促销优惠!
  • 由于超时异常,线程 3 无法获得促销优惠
  • 由于屏障异常,线程 4 无法获得促销优惠
4

3 回答 3

1

ACyclicBarrier只会在 3 个客户尝试访问该优惠时跳闸。

因此,如果只有 1 个客户尝试访问它,它将阻止,直到其他 2 个客户也尝试访问它!一旦屏障跳闸,它就会被简单地重置,并且机制刚刚开始。您可以观察是否创建 6 个以上的线程而不是 5 个。

所以CyclicBarrier似乎不是你要找的。

您可能想要计算已经访问该优惠的客户数量并拒绝新客户:

private static class PromotionBarrier {
    private final AtomicBoolean hasAccess = new AtomicBoolean(false);
    private final AtomicLong counter = new AtomicLong(0);
    private final long maxCustomers = 3;
    public boolean hasAccess() {
        if(hasAccess.get()) {
            long value = counter.incrementAndGet();
            if(value <= maxCustomers) {
                return true;
            } else {
                hasAccess.set(false);
                return false;
            }
        }
        return false; 
    }
}

private static class PromotionRunnable implements Runnable {
    private final PromotionBarrier promotionBarrier;

    public PromotionRunnable(final PromotionBarrier promotionBarrier) {
        this.promotionBarrier = barrier;
    }

    @Override
    public void run() {
        if(promotionBarrier.hasAccess()) {
            // Yoohoo I got it!
        } else {
            // Rha I am too late!!
        }
    }
于 2015-08-27T14:25:27.170 回答
0

CyclicBarrier适用于当您有多个线程,并且您希望它们都同时开始做某事时。如果为 N 个线程设置了屏障,那么它会让前 N-1 个线程等待,直到第 N 个线程到达,然后让它们全部重新运行。

那可能不是你想要的。您希望前 3 个线程获得奖励,其余线程空手而归。 CyclicBarrier就是让线程等待某些东西,但是你不希望你的线程等待。

Semaphore也就是让线程等待某些东西。

我喜欢@OldCurmudgeon 关于使用 AtomicInteger 的建议。

设置一个AtomicInteger等于奖品的数量,然后让每个线程调用ai.decrementAndGet()。如果结果 >= 0,则线程可以领取奖品。如果结果 < 0,那么很抱歉,但没有奖品。

于 2015-08-27T14:31:19.463 回答
0

您错误地使用了CyclicBarrier。这是为了让多个线程同步。

一种同步辅助工具,它允许一组线程相互等待以达到共同的障碍点。CyclicBarriers 在涉及固定大小的线程组的程序中很有用,这些线程组必须偶尔相互等待。屏障被称为循环的,因为它可以在等待线程被释放后重新使用。

尽管我不确定您需要什么,但您可能需要使用信号量。

计数信号量。从概念上讲,信号量维护一组许可。如有必要,每个 acquire() 都会阻塞,直到获得许可,然后再接受它。每个 release() 添加一个许可,可能会释放一个阻塞的获取者。但是,没有使用实际的许可对象;Semaphore 只是对可用数量进行计数并采取相应措施。

您可能正在寻找AtomicInteger

可以自动更新的 int 值。有关原子变量属性的描述,请参阅 java.util.concurrent.atomic 包规范。AtomicInteger 用于原子递增计数器等应用程序,不能用作 Integer 的替代品。

于 2015-08-27T14:17:44.570 回答