

我正在将多线程作为一个主题来处理,因为我在文献中遇到了很多使用它的代码(例如 Android SDK),但我仍然觉得它晦涩难懂。




  • Get some advice on how to do the above, based on my efforts so far (code provided)





`Producer`    (a busy mum making chocolate-covered cakes for his child, up to a limit)
`Consumer`    (a hungry child waiting to eat all cakes the mum makes, until told to stop)
`SharedSpace` (a kitchen table on which the cakes are put as soon as they become ready)
`dataValue`   (a chocolate-dripping cake which MUST be eaten immediately or else...)

为了简化练习,我决定在孩子吃蛋糕的时候不让妈妈做饭她只会等孩子吃完蛋糕,然后立即再做一个,达到一定的限度,以进行良好的育儿。练习的本质是练习Thread的信号,而不是实现任何并发。相反,我专注于完美的序列化,没有轮询或“我可以走了吗?” 检查。我想我将不得不编写后续练习,其中母亲和孩子接下来并行“工作”。


  • 让我的类实现Runnable接口,以便它们有自己的代码入口点

  • 使用我的类作为Thread对象的构造函数参数,这些对象从程序的main入口点 实例化并启动

  • 通过Thread.join()确保main程序不会在Thread之前终止

  • Producer为将创建数据的次数设置限制Consumer

  • 就将用于表示数据生产结束的标记值达成一致Produce

  • 日志获取共享资源的锁和数据生产/消费事件,包括工作线程的最终签核

  • 从程序中创建一个SharedSpace对象,main并在开始之前将其传递给每个工作人员

  • 在内部为每个工作人员存储private对对象的引用SharedSpace

  • Consumer在生成任何数据之前提供防护和消息以描述准备好使用的条件

  • Producer在给定次数的迭代后停止

  • Consumer读取哨兵值后停止


import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
class Consumer extends Threaded {
  public Consumer(SharedSpace sharedSpace) {
  public void run() {
    int consumedData = 0;
    while (consumedData != -1) {
      synchronized (sharedSpace) {
        logger.info("Acquired lock on sharedSpace.");
        consumedData = sharedSpace.dataValue;
        if (consumedData == 0) {
          try {
            logger.info("Data production has not started yet. "
                + "Releasing lock on sharedSpace, "
                + "until notification that it has begun.");
          } catch (InterruptedException interruptedException) {
        } else if (consumedData == -1) {
          logger.info("Consumed: END (end of data production token).");
        } else {
          logger.info("Consumed: {}.", consumedData);
          logger.info("Waking up producer to continue data production.");
          try {
            logger.info("Releasing lock on sharedSpace "
                + "until notified of new data availability.");
          } catch (InterruptedException interruptedException) {
    logger.info("Signing off.");
class Producer extends Threaded {
  private static final int N_ITERATIONS = 10;
  public Producer(SharedSpace sharedSpace) {
  public void run() {
    int nIterations = 0;
    while (nIterations <= N_ITERATIONS) {
      synchronized (sharedSpace) {
        logger.info("Acquired lock on sharedSpace.");
        if (nIterations <= N_ITERATIONS) {
          sharedSpace.dataValue = nIterations;
          logger.info("Produced: {}", nIterations);
        } else {
          sharedSpace.dataValue = -1;
          logger.info("Produced: END (end of data production token).");
        logger.info("Waking up consumer for data consumption.");
        if (nIterations <= N_ITERATIONS) {
          try {
            logger.info("Releasing lock on sharedSpace until notified.");
          } catch (InterruptedException interruptedException) {
    logger.info("Signing off.");
class SharedSpace {
  volatile int dataValue = 0;
abstract class Threaded implements Runnable {
  protected Logger logger;
  protected SharedSpace sharedSpace;
  public Threaded(SharedSpace sharedSpace) {
    this.sharedSpace = sharedSpace;
    logger = LoggerFactory.getLogger(this.getClass());
  public void run() {
    String workerName = getClass().getName();
public class ProducerConsumer {
  public static void main(String[] args) {
    SharedSpace sharedSpace = new SharedSpace();
    Thread producer = new Thread(new Producer(sharedSpace), "Producer");
    Thread consumer = new Thread(new Consumer(sharedSpace), "Consumer");
    try {
    } catch (InterruptedException interruptedException) {


Consumer - Started.
Consumer - Acquired lock on sharedSpace.
Consumer - Data production has not started yet. Releasing lock on sharedSpace, until notification that it has begun.
Producer - Started.
Producer - Acquired lock on sharedSpace.
Producer - Produced: 1
Producer - Waking up consumer for data consumption.
Producer - Releasing lock on sharedSpace until notified.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: 1.
Consumer - Waking up producer to continue data production.
Consumer - Releasing lock on sharedSpace until notified of new data availability.
Producer - Acquired lock on sharedSpace.
Producer - Produced: 2
Producer - Waking up consumer for data consumption.
Producer - Releasing lock on sharedSpace until notified.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: 2.
Consumer - Waking up producer to continue data production.
Consumer - Releasing lock on sharedSpace until notified of new data availability.
Producer - Acquired lock on sharedSpace.
Producer - Produced: 3
Producer - Waking up consumer for data consumption.
Producer - Releasing lock on sharedSpace until notified.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: 3.
Consumer - Waking up producer to continue data production.
Consumer - Releasing lock on sharedSpace until notified of new data availability.
Producer - Acquired lock on sharedSpace.
Producer - Produced: 4
Producer - Waking up consumer for data consumption.
Producer - Releasing lock on sharedSpace until notified.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: 4.
Consumer - Waking up producer to continue data production.
Consumer - Releasing lock on sharedSpace until notified of new data availability.
Producer - Acquired lock on sharedSpace.
Producer - Produced: 5
Producer - Waking up consumer for data consumption.
Producer - Releasing lock on sharedSpace until notified.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: 5.
Consumer - Waking up producer to continue data production.
Consumer - Releasing lock on sharedSpace until notified of new data availability.
Producer - Acquired lock on sharedSpace.
Producer - Produced: 6
Producer - Waking up consumer for data consumption.
Producer - Releasing lock on sharedSpace until notified.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: 6.
Consumer - Waking up producer to continue data production.
Consumer - Releasing lock on sharedSpace until notified of new data availability.
Producer - Acquired lock on sharedSpace.
Producer - Produced: 7
Producer - Waking up consumer for data consumption.
Producer - Releasing lock on sharedSpace until notified.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: 7.
Consumer - Waking up producer to continue data production.
Consumer - Releasing lock on sharedSpace until notified of new data availability.
Producer - Acquired lock on sharedSpace.
Producer - Produced: 8
Producer - Waking up consumer for data consumption.
Producer - Releasing lock on sharedSpace until notified.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: 8.
Consumer - Waking up producer to continue data production.
Consumer - Releasing lock on sharedSpace until notified of new data availability.
Producer - Acquired lock on sharedSpace.
Producer - Produced: 9
Producer - Waking up consumer for data consumption.
Producer - Releasing lock on sharedSpace until notified.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: 9.
Consumer - Waking up producer to continue data production.
Consumer - Releasing lock on sharedSpace until notified of new data availability.
Producer - Acquired lock on sharedSpace.
Producer - Produced: 10
Producer - Waking up consumer for data consumption.
Producer - Releasing lock on sharedSpace until notified.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: 10.
Consumer - Waking up producer to continue data production.
Consumer - Releasing lock on sharedSpace until notified of new data availability.
Producer - Acquired lock on sharedSpace.
Producer - Produced: END (end of data production token).
Producer - Waking up consumer for data consumption.
Producer - Signing off.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: END (end of data production token).
Consumer - Signing off.


  • 以上是正确的吗?(例如,它是否使用了正确的语言工具、正确的方法、是否包含任何愚蠢的代码……)




一个好的问题只关注one requested piece of advice(上述问题),但如果您愿意,请随时在您的答案中提及对以下其他主题的任何见解:

  • 在编写下一次尝试时如何测试并行代码?

  • 哪些工具可以帮助我进行开发和调试?考虑我使用Eclipse

  • 如果我允许Producer继续生产,每次生产花费一些可变的时间,而Consumer消费任何可用的东西,方法会改变吗?锁定是否必须移动到其他地方?信号是否需要从这种等待/通知范式中改变?

  • 这种做事的方法是否已经过时,我应该学习其他东西吗?从这个收费站,我不知道“在 Java 的真实世界中”会发生什么


  • 我应该从这里去哪里?我在某处看到过“未来”的概念,但我可以使用一个编号的主题列表来按顺序完成,按教学顺序进行,并带有指向相关学习资源的链接



我看到的唯一问题是@Tudor 和@Bhaskar 提到的问题。每当你在等待一个条件时测试它,你必须使用一个while循环。然而,这更多的是与多个生产者和消费者的竞争条件。可能会发生虚假唤醒,但竞争条件更有可能发生。请参阅我关于该主题的页面

是的,您只有 1 个生产者和 1 个消费者,但您可以尝试为多个消费者扩展代码或将代码复制到另一个场景。








在多线程代码测试方面,了解如何读取kill -QUIT线程转储并查看 Jconsole 内部正在运行的线程。像YourKit这样的 Java 分析器也可能会有所帮助。

如果我允许 Producer 继续制作,每次制作都需要一些可变的时间,这种方法会改变吗?


这种做事的方法是否已经过时,我应该学习其他东西吗?从这个收费站,我不知道“在 Java 的真实世界中”会发生什么

接下来是学习ExecutorService课程。它们处理大部分new Thread()样式代码——尤其是当您处理大量使用线程执行的异步任务时。这是一个教程


再次,ExecutorService。我假设您已阅读此起始文档。正如@Bhaskar 所提到的,Java Concurrency in Practice是一本很好的圣经。


  • SharedSpaceandThreaded类似乎是一种人为的方式来做到这一点。如果您正在玩基类等,那很好。但总的来说,我从不使用这样的模式。生产者和消费者通常使用BlockingQueue类似的东西LinkedBlockingQueue,在这种情况下,同步和volatile有效负载会为您处理。此外,我倾向于将共享信息注入对象构造函数,而不是从基类中获取。

  • 通常,如果我使用synchronized它是在一个private final字段上。private final Object lockObject = new Object();除非我已经在使用对象,否则我通常会创建一个用于锁定的对象。

  • 小心synchronized大块并将日志消息放在synchronized部分内。日志通常synchronized对文件系统进行 IO,这可能非常昂贵。如果可能的话,你应该有小的、非常紧的synchronized块。

  • consumedData在循环之外定义。我会在分配点定义它,然后使用 abreak从循环中退出(如果是)== -1。如果可能,请确保限制您的局部变量范围。

  • 您的日志消息将主导您的代码性能。这意味着当您删除它们时,您的代码将执行完全不同的操作。当您使用它来调试问题时,意识到这一点非常重要。当您迁移到具有不同 CPU/内核的不同架构时,性能也(很可能)会发生变化。

  • 您可能知道这一点,但是当您调用时sharedSpace.notify();,这仅意味着如果另一个线程当前在中,则会通知sharedSpace.wait();它。如果它不是其他东西,那么它将错过通知。仅供参考。

  • 做 a 有点奇怪if (nIterations <= N_ITERATIONS),然后在下面的 3 行else再做一次。复制notify()会更好地简化分支。

  • 然后你有int nIterations = 0;一个while然后在一个++中。这是 for 循环的秘诀:

    for (int nIterations = 0; nIterations <= N_ITERATIONS; nIterations++) {


public class Consumer implements Runnable {
    private final BlockingQueue<Integer> queue;
    public Consumer(BlockingQueue<Integer> queue) {
       this.queue = queue;
    public void run() {
       while (true) {
          int consumedData = queue.take();
          if (consumedData ==  Producer.FINAL_VALUE) {
              logger.info("Consumed: END (end of data production token).");
          logger.info("Consumed: {}.", consumedData);
       logger.info("Signing off.");

public class Producer implements Runnable {
    public static final int FINAL_VALUE = -1;
    private final BlockingQueue<Integer> queue;
    public Producer(BlockingQueue<Integer> queue) {
       this.queue = queue;
    public void run() {
       for (int nIterations = 0; nIterations <= N_ITERATIONS; nIterations++) {
          logger.info("Produced: {}", nIterations);
       logger.info("Produced: END (end of data production token).");
       logger.info("Signing off.");

public class ProducerConsumer {
    public static void main(String[] args) {
       // you can add an int argument to the LinkedBlockingQueue constructor
       // to only allow a certain number of items in the queue at one time
       BlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>();
       Thread producer = new Thread(new Producer(queue), "Producer");
       Thread consumer = new Thread(new Consumer(queue), "Consumer");
       // start and join go here
编辑:Bhaskar 提出了一个很好的观点,即使用 awhile来包装对wait. 这是因为可能发生臭名昭著的虚假唤醒,迫使线程wait过早退出,因此您需要确保它重新进入。

您接下来可以做的是实现一个有限缓冲区生产者消费者:拥有一些共享数据结构,例如链表并设置最大大小(例如 10 个项目)。然后让生产者继续生产,并且只有在队列中有 10 个项目时才暂停它。只要缓冲区为空,消费者就会被挂起。



Producers and Consumers can be simple classes implementing Runnable ( no extends Threaded ) That way they are less fragile. Clients can create Threads themeselves and attach the instances so no overhead of a class hierarchy is needed.

Your condition before you wait() should be a while() rather than an if .

edit: from JCIP page 301 :

void stateDependentMethod() throws InterruptedException {
      // condition predicate must be guarded by lock
      synchronized(lock) {
          while (!conditionPredicate())
          // object is now in desired state

You have builtin the condition to stop statically. Normally , producers and consumers ought to be more flexible- they should be able to respond to an external signal for stopping.

For starters , to implement an external stop signal , you have a flag :

class Producer implements Runnable { 
     private volatile boolean stopRequested ;

     public void run() {
           if(stopRequested )
                // get out of the loop

     public void stop(){
        stopRequested  = true;
        // arrange to  interrupt the Producer thread here.

When you try to implement the above , you will probably see that there are other complications that arise - for example - your producer is first publishing and then wait() ing , but that can lead to problems.

If you are interested in further reading , I suggest read the book - Java Concurrency In Practice. This will have a whole lot of recommendations than I can add here.

巨大的野心!大约 8 年前,你问过这个问题。我希望您的努力为您提供(并继续为您提供)您想要的教育。

这些天来wait(),强烈建议不要在 Java 中实现多线程。当你试图在这个低级别控制并发时,你很容易自责(事实上,Java 设计者承认许多方法和语义实际上是设计错误,但为了向后兼容的目的,他们不得不留下它们—— - 许多人正在使用新的“虚拟线程”(Project Loom)——但这是一个不同的话题)。notify()join()Thread

今天手动启动和控制线程的首选方式是 via ExecutorService.submit(Callable<V>),返回一个Future<V>. 然后,您可以通过调用等待线程退出(并获取返回值)Future<V>.get(),返回由可调用对象返回的类型值V(或者ExecutionException如果Callable抛出未捕获的异常则抛出)。


import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public abstract class ProducerConsumer<E> {

    private final BlockingQueue<Optional<E>> queue;

    public ProducerConsumer(
            int numProducerThreads, int numConsumerThreads, int queueCapacity) {
        if (numProducerThreads < 1 || numConsumerThreads < 1 || queueCapacity < 1) {
            throw new IllegalArgumentException();
        queue = new ArrayBlockingQueue<Optional<E>>(queueCapacity);
        final ExecutorService executor = 
                Executors.newFixedThreadPool(numProducerThreads + numConsumerThreads);
        try {
            // Start producer threads
            final List<Future<?>> producerFutures = new ArrayList<>();
            final AtomicInteger numLiveProducers = new AtomicInteger();
            for (int i = 0; i < numProducerThreads; i++) {
                producerFutures.add(executor.submit(() -> {
                    // Run producer
                    // When last producer finishes, deliver poison pills to consumers
                    if (numLiveProducers.decrementAndGet() == 0) {
                        for (int j = 0; j < numConsumerThreads; j++) {
                    return null;
            // Start consumer threads
            final List<Future<?>> consumerFutures = new ArrayList<>();
            for (int i = 0; i < numConsumerThreads; i++) {
                consumerFutures.add(executor.submit(() -> {
                    // Run Consumer
                    return null;
            // Wait for all producers to complete
            completionBarrier(producerFutures, false);
            // Shut down any consumers that are still running after producers complete
            completionBarrier(consumerFutures, false);
        } finally {

    private static void completionBarrier(List<Future<?>> futures, boolean cancel) {
        for (Future<?> future : futures) {
            try {
                if (cancel) {
            } catch (CancellationException | InterruptedException e) {
                // Ignore
            } catch (ExecutionException e) {
                throw new RuntimeException(e);

    protected void produce(E val) {
        try {
        } catch (InterruptedException e) {
            throw new RuntimeException(e);

    protected Optional<E> consume() {
        try {
            return queue.take();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);

    /** Producer loop. Call {@link #produce(E)} for each element. */
    public abstract void producer();

     * Consumer thread. Call {@link #consume()} to get each successive element,
     * until an empty {@link Optional} is returned.
    public abstract void consumer();


new ProducerConsumer<Integer>(/* numProducerThreads = */ 1, /* numConsumerThreads = */ 4,
        /* queueCapacity = */ 10) {
    public void producer() {
        for (int i = 0; i < 100; i++) {
            System.out.println("Producing " + i);

    public void consumer() {
        for (Optional<Integer> opt; (opt = consume()).isPresent; ) {
            int i = opt.get();
            System.out.println("Got " + i);
