19

背景

没钱上学,我在收费站上夜班,并利用互联网自学一些编码技能,希望明天能找到更好的工作或在线销售我制作的一些应用程序。漫漫长夜,顾客寥寥。

我正在将多线程作为一个主题来处理,因为我在文献中遇到了很多使用它的代码(例如 Android SDK),但我仍然觉得它晦涩难懂。

精神

在这一点上,我的方法是:尝试编写我能想到的最基本的多线程示例,将我的头撞到墙上,看看我是否可以伸展我的大脑来适应一些新颖的思维方式。我将自己暴露在我的极限中,希望能超越它们。随意疯狂批评,吹毛求疵,并指出更好的方法来做我想做的事情。

客观的

  • Get some advice on how to do the above, based on my efforts so far (code provided)

练习

这是我定义的范围:

定义

创建两个类,它们在数据对象的生产和消费方面协同工作。一个线程创建对象并将它们传递到共享空间,以供另一个线程获取和使用。我们称之为生产线程Producer、消费线程Consumer和共享空间SharedSpace。生产对象以供他人消费的行为可以通过类比这种情况来同化:

`Producer`    (a busy mum making chocolate-covered cakes for his child, up to a limit)
`Consumer`    (a hungry child waiting to eat all cakes the mum makes, until told to stop)
`SharedSpace` (a kitchen table on which the cakes are put as soon as they become ready)
`dataValue`   (a chocolate-dripping cake which MUST be eaten immediately or else...)

为了简化练习,我决定在孩子吃蛋糕的时候不让妈妈做饭她只会等孩子吃完蛋糕,然后立即再做一个,达到一定的限度,以进行良好的育儿。练习的本质是练习Thread的信号,而不是实现任何并发。相反,我专注于完美的序列化,没有轮询或“我可以走了吗?” 检查。我想我将不得不编写后续练习,其中母亲和孩子接下来并行“工作”。

方法

  • 让我的类实现Runnable接口,以便它们有自己的代码入口点

  • 使用我的类作为Thread对象的构造函数参数,这些对象从程序的main入口点 实例化并启动

  • 通过Thread.join()确保main程序不会在Thread之前终止

  • Producer为将创建数据的次数设置限制Consumer

  • 就将用于表示数据生产结束的标记值达成一致Produce

  • 日志获取共享资源的锁和数据生产/消费事件,包括工作线程的最终签核

  • 从程序中创建一个SharedSpace对象,main并在开始之前将其传递给每个工作人员

  • 在内部为每个工作人员存储private对对象的引用SharedSpace

  • Consumer在生成任何数据之前提供防护和消息以描述准备好使用的条件

  • Producer在给定次数的迭代后停止

  • Consumer读取哨兵值后停止

代码


import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
class Consumer extends Threaded {
  public Consumer(SharedSpace sharedSpace) {
    super(sharedSpace);
  }
  @Override
  public void run() {
    super.run();
    int consumedData = 0;
    while (consumedData != -1) {
      synchronized (sharedSpace) {
        logger.info("Acquired lock on sharedSpace.");
        consumedData = sharedSpace.dataValue;
        if (consumedData == 0) {
          try {
            logger.info("Data production has not started yet. "
                + "Releasing lock on sharedSpace, "
                + "until notification that it has begun.");
            sharedSpace.wait();
          } catch (InterruptedException interruptedException) {
            logger.error(interruptedException.getStackTrace().toString());
          }
        } else if (consumedData == -1) {
          logger.info("Consumed: END (end of data production token).");
        } else {
          logger.info("Consumed: {}.", consumedData);
          logger.info("Waking up producer to continue data production.");
          sharedSpace.notify();
          try {
            logger.info("Releasing lock on sharedSpace "
                + "until notified of new data availability.");
            sharedSpace.wait();
          } catch (InterruptedException interruptedException) {
            logger.error(interruptedException.getStackTrace().toString());
          }
        }
      }
    }
    logger.info("Signing off.");
  }
}
class Producer extends Threaded {
  private static final int N_ITERATIONS = 10;
  public Producer(SharedSpace sharedSpace) {
    super(sharedSpace);
  }
  @Override
  public void run() {
    super.run();
    int nIterations = 0;
    while (nIterations <= N_ITERATIONS) {
      synchronized (sharedSpace) {
        logger.info("Acquired lock on sharedSpace.");
        nIterations++;
        if (nIterations <= N_ITERATIONS) {
          sharedSpace.dataValue = nIterations;
          logger.info("Produced: {}", nIterations);
        } else {
          sharedSpace.dataValue = -1;
          logger.info("Produced: END (end of data production token).");
        }
        logger.info("Waking up consumer for data consumption.");
        sharedSpace.notify();
        if (nIterations <= N_ITERATIONS) {
          try {
            logger.info("Releasing lock on sharedSpace until notified.");
            sharedSpace.wait();
          } catch (InterruptedException interruptedException) {
            logger.error(interruptedException.getStackTrace().toString());
          }
        }
      }
    }
    logger.info("Signing off.");
  }
}
class SharedSpace {
  volatile int dataValue = 0;
}
abstract class Threaded implements Runnable {
  protected Logger logger;
  protected SharedSpace sharedSpace;
  public Threaded(SharedSpace sharedSpace) {
    this.sharedSpace = sharedSpace;
    logger = LoggerFactory.getLogger(this.getClass());
  }
  @Override
  public void run() {
    logger.info("Started.");
    String workerName = getClass().getName();
    Thread.currentThread().setName(workerName);
  }
}
public class ProducerConsumer {
  public static void main(String[] args) {
    SharedSpace sharedSpace = new SharedSpace();
    Thread producer = new Thread(new Producer(sharedSpace), "Producer");
    Thread consumer = new Thread(new Consumer(sharedSpace), "Consumer");
    producer.start();
    consumer.start();
    try {
      producer.join();
      consumer.join();
    } catch (InterruptedException interruptedException) {
      interruptedException.printStackTrace();
    }
  }
}

执行日志


Consumer - Started.
Consumer - Acquired lock on sharedSpace.
Consumer - Data production has not started yet. Releasing lock on sharedSpace, until notification that it has begun.
Producer - Started.
Producer - Acquired lock on sharedSpace.
Producer - Produced: 1
Producer - Waking up consumer for data consumption.
Producer - Releasing lock on sharedSpace until notified.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: 1.
Consumer - Waking up producer to continue data production.
Consumer - Releasing lock on sharedSpace until notified of new data availability.
Producer - Acquired lock on sharedSpace.
Producer - Produced: 2
Producer - Waking up consumer for data consumption.
Producer - Releasing lock on sharedSpace until notified.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: 2.
Consumer - Waking up producer to continue data production.
Consumer - Releasing lock on sharedSpace until notified of new data availability.
Producer - Acquired lock on sharedSpace.
Producer - Produced: 3
Producer - Waking up consumer for data consumption.
Producer - Releasing lock on sharedSpace until notified.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: 3.
Consumer - Waking up producer to continue data production.
Consumer - Releasing lock on sharedSpace until notified of new data availability.
Producer - Acquired lock on sharedSpace.
Producer - Produced: 4
Producer - Waking up consumer for data consumption.
Producer - Releasing lock on sharedSpace until notified.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: 4.
Consumer - Waking up producer to continue data production.
Consumer - Releasing lock on sharedSpace until notified of new data availability.
Producer - Acquired lock on sharedSpace.
Producer - Produced: 5
Producer - Waking up consumer for data consumption.
Producer - Releasing lock on sharedSpace until notified.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: 5.
Consumer - Waking up producer to continue data production.
Consumer - Releasing lock on sharedSpace until notified of new data availability.
Producer - Acquired lock on sharedSpace.
Producer - Produced: 6
Producer - Waking up consumer for data consumption.
Producer - Releasing lock on sharedSpace until notified.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: 6.
Consumer - Waking up producer to continue data production.
Consumer - Releasing lock on sharedSpace until notified of new data availability.
Producer - Acquired lock on sharedSpace.
Producer - Produced: 7
Producer - Waking up consumer for data consumption.
Producer - Releasing lock on sharedSpace until notified.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: 7.
Consumer - Waking up producer to continue data production.
Consumer - Releasing lock on sharedSpace until notified of new data availability.
Producer - Acquired lock on sharedSpace.
Producer - Produced: 8
Producer - Waking up consumer for data consumption.
Producer - Releasing lock on sharedSpace until notified.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: 8.
Consumer - Waking up producer to continue data production.
Consumer - Releasing lock on sharedSpace until notified of new data availability.
Producer - Acquired lock on sharedSpace.
Producer - Produced: 9
Producer - Waking up consumer for data consumption.
Producer - Releasing lock on sharedSpace until notified.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: 9.
Consumer - Waking up producer to continue data production.
Consumer - Releasing lock on sharedSpace until notified of new data availability.
Producer - Acquired lock on sharedSpace.
Producer - Produced: 10
Producer - Waking up consumer for data consumption.
Producer - Releasing lock on sharedSpace until notified.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: 10.
Consumer - Waking up producer to continue data production.
Consumer - Releasing lock on sharedSpace until notified of new data availability.
Producer - Acquired lock on sharedSpace.
Producer - Produced: END (end of data production token).
Producer - Waking up consumer for data consumption.
Producer - Signing off.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: END (end of data production token).
Consumer - Signing off.

问题

  • 以上是正确的吗?(例如,它是否使用了正确的语言工具、正确的方法、是否包含任何愚蠢的代码……)

但它“看起来正确”?

即使输出“看起来不错”,我也会询问正确性,因为您无法想象在我的测试“一次”而不是“另一次”中出现了多少次问题(例如,当消费者首先开始时,当生产者从未退出时在生产哨兵等之后)。我学会了不要从“成功的运行”中声称正确。相反,我对伪并行代码变得非常怀疑!(根据定义,这个甚至不是平行的!0

扩展答案

一个好的问题只关注one requested piece of advice(上述问题),但如果您愿意,请随时在您的答案中提及对以下其他主题的任何见解:

  • 在编写下一次尝试时如何测试并行代码?

  • 哪些工具可以帮助我进行开发和调试?考虑我使用Eclipse

  • 如果我允许Producer继续生产,每次生产花费一些可变的时间,而Consumer消费任何可用的东西,方法会改变吗?锁定是否必须移动到其他地方?信号是否需要从这种等待/通知范式中改变?

  • 这种做事的方法是否已经过时,我应该学习其他东西吗?从这个收费站,我不知道“在 Java 的真实世界中”会发生什么

下一步

  • 我应该从这里去哪里?我在某处看到过“未来”的概念,但我可以使用一个编号的主题列表来按顺序完成,按教学顺序进行,并带有指向相关学习资源的链接

蒂诺·西诺

4

4 回答 4

6

以上是正确的吗?

我看到的唯一问题是@Tudor 和@Bhaskar 提到的问题。每当你在等待一个条件时测试它,你必须使用一个while循环。然而,这更多的是与多个生产者和消费者的竞争条件。可能会发生虚假唤醒,但竞争条件更有可能发生。请参阅我关于该主题的页面

是的,您只有 1 个生产者和 1 个消费者,但您可以尝试为多个消费者扩展代码或将代码复制到另一个场景。

我学会了不要从“成功的运行”中声称正确。相反,我对伪并行代码变得非常怀疑!

好本能。

在编写下一次尝试时如何测试并行代码?

这很难。扩大规模是一种方法。添加多个生产者和消费者,看看是否有问题。在具有不同数量/类型的处理器的多个架构上运行。你最好的防御将是代码的正确性。紧密同步,很好地使用BlockingQueue,ExecutorService等类,使您的关闭更简单/更干净。

没有简单的答案。测试多线程代码非常困难。

哪些工具可以帮助我进行开发和调试?

就一般的东西而言,我会研究像Emma这样的覆盖工具,这样你就可以确保你的单元测试覆盖了你的所有代码。

在多线程代码测试方面,了解如何读取kill -QUIT线程转储并查看 Jconsole 内部正在运行的线程。像YourKit这样的 Java 分析器也可能会有所帮助。

如果我允许 Producer 继续制作,每次制作都需要一些可变的时间,这种方法会改变吗?

我不这么认为。消费者将永远等待生产者。也许我不明白这个问题?

这种做事的方法是否已经过时,我应该学习其他东西吗?从这个收费站,我不知道“在 Java 的真实世界中”会发生什么

接下来是学习ExecutorService课程。它们处理大部分new Thread()样式代码——尤其是当您处理大量使用线程执行的异步任务时。这是一个教程

我应该从这里去哪里?

再次,ExecutorService。我假设您已阅读此起始文档。正如@Bhaskar 所提到的,Java Concurrency in Practice是一本很好的圣经。


以下是关于您的代码的一些一般性评论:

  • SharedSpaceandThreaded类似乎是一种人为的方式来做到这一点。如果您正在玩基类等,那很好。但总的来说,我从不使用这样的模式。生产者和消费者通常使用BlockingQueue类似的东西LinkedBlockingQueue,在这种情况下,同步和volatile有效负载会为您处理。此外,我倾向于将共享信息注入对象构造函数,而不是从基类中获取。

  • 通常,如果我使用synchronized它是在一个private final字段上。private final Object lockObject = new Object();除非我已经在使用对象,否则我通常会创建一个用于锁定的对象。

  • 小心synchronized大块并将日志消息放在synchronized部分内。日志通常synchronized对文件系统进行 IO,这可能非常昂贵。如果可能的话,你应该有小的、非常紧的synchronized块。

  • consumedData在循环之外定义。我会在分配点定义它,然后使用 abreak从循环中退出(如果是)== -1。如果可能,请确保限制您的局部变量范围。

  • 您的日志消息将主导您的代码性能。这意味着当您删除它们时,您的代码将执行完全不同的操作。当您使用它来调试问题时,意识到这一点非常重要。当您迁移到具有不同 CPU/内核的不同架构时,性能也(很可能)会发生变化。

  • 您可能知道这一点,但是当您调用时sharedSpace.notify();,这仅意味着如果另一个线程当前在中,则会通知sharedSpace.wait();它。如果它不是其他东西,那么它将错过通知。仅供参考。

  • 做 a 有点奇怪if (nIterations <= N_ITERATIONS),然后在下面的 3 行else再做一次。复制notify()会更好地简化分支。

  • 然后你有int nIterations = 0;一个while然后在一个++中。这是 for 循环的秘诀:

    for (int nIterations = 0; nIterations <= N_ITERATIONS; nIterations++) {
    

这是您的代码的更严格的版本。这只是我将如何编写它的一个示例。同样,除了丢失之外while,您的版本似乎没有任何问题。

public class Consumer implements Runnable {
    private final BlockingQueue<Integer> queue;
    public Consumer(BlockingQueue<Integer> queue) {
       this.queue = queue;
    }
    @Override
    public void run() {
       while (true) {
          int consumedData = queue.take();
          if (consumedData ==  Producer.FINAL_VALUE) {
              logger.info("Consumed: END (end of data production token).");
              break;
          }
          logger.info("Consumed: {}.", consumedData);
       }
       logger.info("Signing off.");
    }
}

public class Producer implements Runnable {
    public static final int FINAL_VALUE = -1;
    private final BlockingQueue<Integer> queue;
    public Producer(BlockingQueue<Integer> queue) {
       this.queue = queue;
    }
    @Override
    public void run() {
       for (int nIterations = 0; nIterations <= N_ITERATIONS; nIterations++) {
          logger.info("Produced: {}", nIterations);
          queue.put(nIterations);
       }
       queue.put(FINAL_VALUE);
       logger.info("Produced: END (end of data production token).");
       logger.info("Signing off.");
    }
}

public class ProducerConsumer {
    public static void main(String[] args) {
       // you can add an int argument to the LinkedBlockingQueue constructor
       // to only allow a certain number of items in the queue at one time
       BlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>();
       Thread producer = new Thread(new Producer(queue), "Producer");
       Thread consumer = new Thread(new Consumer(queue), "Consumer");
       // start and join go here
    }
}
于 2012-09-25T20:51:45.603 回答
4

你似乎在这里做得很好。其实没什么好挑剔的。我想推荐的一个想法是您应该避免在缓冲区对象本身上进行同步。在这种情况下没关系,但是假设您切换到数据结构缓冲区,这取决于它可能在内部同步的类(例如Vector,虽然它现在已经过时了),所以从外部获取锁可能会搞砸。

编辑:Bhaskar 提出了一个很好的观点,即使用 awhile来包装对wait. 这是因为可能发生臭名昭著的虚假唤醒,迫使线程wait过早退出,因此您需要确保它重新进入。

您接下来可以做的是实现一个有限缓冲区生产者消费者:拥有一些共享数据结构,例如链表并设置最大大小(例如 10 个项目)。然后让生产者继续生产,并且只有在队列中有 10 个项目时才暂停它。只要缓冲区为空,消费者就会被挂起。

您可以采取的下一步是学习如何自动化您手动实施的流程。看看BlockingQueue它提供了一个具有阻塞行为的缓冲区(即,如果缓冲区为空,消费者将自动阻塞,如果缓冲区已满,生产者将自动阻塞)。

此外,根据具体情况,执行器(查看ExecutorService)可能是一个有价值的替代品,因为它们封装了一个任务队列和一个或多个工人(消费者),所以您只需要生产者。

于 2012-09-23T17:34:00.183 回答
0

Producers and Consumers can be simple classes implementing Runnable ( no extends Threaded ) That way they are less fragile. Clients can create Threads themeselves and attach the instances so no overhead of a class hierarchy is needed.

Your condition before you wait() should be a while() rather than an if .

edit: from JCIP page 301 :

void stateDependentMethod() throws InterruptedException {
      // condition predicate must be guarded by lock
      synchronized(lock) {
          while (!conditionPredicate())
            lock.wait();
          // object is now in desired state
       }
  }

You have builtin the condition to stop statically. Normally , producers and consumers ought to be more flexible- they should be able to respond to an external signal for stopping.

For starters , to implement an external stop signal , you have a flag :

class Producer implements Runnable { 
     private volatile boolean stopRequested ;

     public void run() {
        while(true){
           if(stopRequested )
                // get out of the loop
         }
     }

     public void stop(){
        stopRequested  = true;
        // arrange to  interrupt the Producer thread here.
     }
 }

When you try to implement the above , you will probably see that there are other complications that arise - for example - your producer is first publishing and then wait() ing , but that can lead to problems.

If you are interested in further reading , I suggest read the book - Java Concurrency In Practice. This will have a whole lot of recommendations than I can add here.

于 2012-09-23T17:49:47.120 回答
0

巨大的野心!大约 8 年前,你问过这个问题。我希望您的努力为您提供(并继续为您提供)您想要的教育。

这些天来wait(),强烈建议不要在 Java 中实现多线程。当你试图在这个低级别控制并发时,你很容易自责(事实上,Java 设计者承认许多方法和语义实际上是设计错误,但为了向后兼容的目的,他们不得不留下它们—— - 许多人正在使用新的“虚拟线程”(Project Loom)——但这是一个不同的话题)。notify()join()Thread

今天手动启动和控制线程的首选方式是 via ExecutorService.submit(Callable<V>),返回一个Future<V>. 然后,您可以通过调用等待线程退出(并获取返回值)Future<V>.get(),返回由可调用对象返回的类型值V(或者ExecutionException如果Callable抛出未捕获的异常则抛出)。

下面的类是一个如何实现这样的例子的例子。这将通过单个有界阻塞队列将任意数量的生产者连接到任意数量的消费者。(来自线程的返回值被忽略,因此ExecutorService.submit(Runnable)称为返回Future<?>,而不是ExecutorService.submit(Callable<V>))。

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public abstract class ProducerConsumer<E> {

    private final BlockingQueue<Optional<E>> queue;

    public ProducerConsumer(
            int numProducerThreads, int numConsumerThreads, int queueCapacity) {
        if (numProducerThreads < 1 || numConsumerThreads < 1 || queueCapacity < 1) {
            throw new IllegalArgumentException();
        }
        queue = new ArrayBlockingQueue<Optional<E>>(queueCapacity);
        final ExecutorService executor = 
                Executors.newFixedThreadPool(numProducerThreads + numConsumerThreads);
        try {
            // Start producer threads
            final List<Future<?>> producerFutures = new ArrayList<>();
            final AtomicInteger numLiveProducers = new AtomicInteger();
            for (int i = 0; i < numProducerThreads; i++) {
                producerFutures.add(executor.submit(() -> {
                    numLiveProducers.incrementAndGet();
                    // Run producer
                    producer();
                    // When last producer finishes, deliver poison pills to consumers
                    if (numLiveProducers.decrementAndGet() == 0) {
                        for (int j = 0; j < numConsumerThreads; j++) {
                            queue.put(Optional.empty());
                        }
                    }
                    return null;
                }));
            }
            // Start consumer threads
            final List<Future<?>> consumerFutures = new ArrayList<>();
            for (int i = 0; i < numConsumerThreads; i++) {
                consumerFutures.add(executor.submit(() -> {
                    // Run Consumer
                    consumer();
                    return null;
                }));
            }
            // Wait for all producers to complete
            completionBarrier(producerFutures, false);
            // Shut down any consumers that are still running after producers complete
            completionBarrier(consumerFutures, false);
        } finally {
            executor.shutdownNow();
        }
    }

    private static void completionBarrier(List<Future<?>> futures, boolean cancel) {
        for (Future<?> future : futures) {
            try {
                if (cancel) {
                    future.cancel(true);
                }
                future.get();
            } catch (CancellationException | InterruptedException e) {
                // Ignore
            } catch (ExecutionException e) {
                throw new RuntimeException(e);
            }
        }
    }

    protected void produce(E val) {
        try {
            queue.put(Optional.of(val));
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    protected Optional<E> consume() {
        try {
            return queue.take();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    /** Producer loop. Call {@link #produce(E)} for each element. */
    public abstract void producer();

    /**
     * Consumer thread. Call {@link #consume()} to get each successive element,
     * until an empty {@link Optional} is returned.
     */
    public abstract void consumer();
}

使用如下:

new ProducerConsumer<Integer>(/* numProducerThreads = */ 1, /* numConsumerThreads = */ 4,
        /* queueCapacity = */ 10) {
    @Override
    public void producer() {
        for (int i = 0; i < 100; i++) {
            System.out.println("Producing " + i);
            produce(i);
        }
    }

    @Override
    public void consumer() {
        for (Optional<Integer> opt; (opt = consume()).isPresent; ) {
            int i = opt.get();
            System.out.println("Got " + i);
        }
    }
};
于 2020-07-09T10:12:51.090 回答