2

我正在尝试线程停放,并决定构建某种服务。这是它的样子:

public class TestService {
    private static final Logger logger = LoggerFactory.getLogger(TestService.class); // logback I think this logger causes some troubles

    private final CountDownLatch stopLatch;
    private final Object parkBlocker = new Object();
    private volatile boolean stopped;
    private final Thread[] workers;

    public TestService(int parallelizm) {
        stopLatch = new CountDownLatch(parallelizm);
        workers = new Thread[parallelizm];
        for (int i = 0; i < parallelizm; i++) {
            workers[i] = new Thread(() -> {
                try {
                    while (!stopped) {
                        logger.debug("Parking " + Thread.currentThread().getName());
                        LockSupport.park(parkBlocker);
                        logger.debug(Thread.currentThread().getName() + " unparked");
                    }
                } finally {
                    stopLatch.countDown();
                }
            });
        }
    }

    public void start() {
        Arrays.stream(workers).forEach(t -> {
            t.start();
            logger.debug(t.getName() + " started");
        });
    }

    public boolean stop(long timeout, TimeUnit unit) throws InterruptedException {
        boolean stoppedSuccefully = false;
        this.stopped = true;
        unparkWorkers();
        if (stopLatch.await(timeout, unit)) {
            stoppedSuccefully = true;
        }
        return stoppedSuccefully;
    }

    private void unparkWorkers() {
        Arrays.stream(workers).forEach(w -> {
            LockSupport.unpark(w);
            logger.debug("Un-park call is done on " + w.getName());
        });
    }
}

我面临的问题是,如果我按以下方式测试此服务:

public static void main(String[] args) = {
  while(true) {
    TestService service = new TestService(2);
    service.start();
    if (!service.stop(10000, TimeUnit.MILLISECONDS))
      throw new RuntimeException();
  }
}

我有时会遇到以下行为:

14:58:55.226 [main] DEBUG com.pack.age.TestService - Thread-648 started
14:58:55.227 [Thread-648] DEBUG com.pack.age.TestService - Parking Thread-648
14:58:55.227 [main] DEBUG com.pack.age.TestService - Thread-649 started
14:58:55.227 [main] DEBUG com.pack.age.TestService - Un-park call is done on Thread-648
14:58:55.227 [Thread-648] DEBUG com.pack.age.TestService - Thread-648 unparked
14:58:55.227 [main] DEBUG com.pack.age.TestService - Un-park call is done on Thread-649
14:58:55.227 [Thread-649] DEBUG com.pack.age.TestService - Parking Thread-649
Exception in thread "main" java.lang.RuntimeException
    at com.pack.age.Test$.main(Test.scala:12)
    at com.pack.age.Test.main(Test.scala)

线程挂在停车场:

"Thread-649" #659 prio=5 os_prio=0 tid=0x00007efe4433f000 nid=0x7691 waiting on condition [0x00007efe211c8000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x0000000720739a68> (a java.lang.Object)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at com.pack.age.TestService.lambda$new$0(TestService.java:27)
    at com.pack.age.TestService$$Lambda$1/1327763628.run(Unknown Source)
    at java.lang.Thread.run(Thread.java:748)

我在服务中没有看到任何在 park-unpark 中的比赛。此外,如果在unpark之前调用park了,park则保证不会阻塞(这就是javadocs所说的)。

也许我误用了LockSupport::park. 你能建议任何修复吗?

4

1 回答 1

3

这与记录器无关,尽管它的使用使问题浮出水面。你有一个竞争条件,就这么简单。LockSupport::unpark在解释该竞争条件之前,您需要先从文档中了解一些内容:

使给定线程的许可证可用(如果它尚不可用)。如果线程在 park 上被阻塞,那么它将解除阻塞。 否则,它的下一次停车呼叫保证不会阻塞。

第一点在这里解释。简短的版本是:如果您有一个thread已经启动但尚未调用park的 ,并且在这段时间内(在start线程的 和之间park),其他线程调用unpark第一个:该线程将不会停止,完全没有。许可证将立即可用。可能这张小图会让它更清楚:

(ThreadA)  start ------------------ park --------- ....

(ThreadB)  start ----- unpark -----

请注意在 where和之间的调用ThreadB方式。因此,当到达时:保证不会阻塞,就像文档说的那样。unpark(ThreadA)ThreadAstartparkThreadApark

同一文档中的第二点是:

如果给定线程尚未启动,则不保证此操作有任何效果。

让我们通过一张图来看看:

Thread B calls unpark(ThreadA) --- Thread A starts --- Thread A calls park 

ThreadA调用后park,它将永远挂起,因为ThreadB再也不会调用unpark它。请注意,之前unpark进行的调用已经开始(与前面的示例不同)。 ThreadA

这正是您的情况:

LockSupport.unpark(w);(from ) 在from之前unparkWorkers被调用。用更简单的话来说——你的代码甚至在它们开始之前就调用了它们,因此当它们最终到达时——它们被卡住了,没有人能够做到。你看到这个的事实很可能与你的脸有关 - 当你-引擎盖下有一种方法。 t.start();public void start(){...}unparkworkers parkunparkloggerSystem::outprintlnsynchronized


事实上,LockSupport准确地提供了证明这一点所需的语义。为此,我们需要(为简单起见SOProblem service = new SOProblem(1);:)

static class ParkBlocker {

    private volatile int x;

    public ParkBlocker(int x) {
        this.x = x;
    }

    public int getX() {
        return x;
    }
}

现在我们需要在适当的方法中插入它。首先标记我们所说的事实unpark

private void unparkWorkers() {
    Arrays.stream(workers).forEach(w -> {
        LockSupport.unpark(w);
        logger.debug("Un-park call is done on " + w.getName());
    });
    /*
     * add "1" to whatever there is already in pb.x, meaning
     * we have done unparking _also_
     */
    int y = pb.x;
    y = y + 1;
    pb.x = y;
}

然后在循环结束后重置标志:

public boolean stop(long timeout, TimeUnit unit) throws InterruptedException {
    boolean stoppedSuccefully = false;
    stopped = true;
    unparkWorkers();
    if (stopLatch.await(timeout, unit)) {
        stoppedSuccefully = true;
        // reset the flag
        pb.x = 0;
    }
    return stoppedSuccefully;
}

然后更改构造函数以标记线程已启动:

  .....
  while (!stopped) {
       logger.debug("Parking " + Thread.currentThread().getName());
       // flag the fact that thread has started. add "2", meaning
       // thread has started
       int y = pb.x;
       y = y + 2;
       pb.x = y;
       LockSupport.park(pb);
       logger.debug(Thread.currentThread().getName() + " unparked");
  }

然后,当您的线程冻结时,您需要检查标志:

 public static void main(String[] args) throws InterruptedException {
    while (true) {
        SOProblem service = new SOProblem(1); // <-- notice a single worker, for simplicity
        service.start();
        if (!service.stop(10000, TimeUnit.MILLISECONDS)) {
            service.debug();
            throw new RuntimeException();
        }
    }
}

其中debug方法是:

public void debug() {
    Arrays.stream(workers)
          .forEach(x -> {
              ParkBlocker pb = (ParkBlocker) LockSupport.getBlocker(x);
              if (pb != null) {
                  System.out.println("x = " + pb.getX());
              }
          });
}

当问题再次出现时,您在调用unpark 之前调用park了,这在x = 3作为输出时发生。

于 2020-04-10T22:44:32.283 回答