0

我正在尝试java中的多线程示例。Java Complete reference 7th Edition 中有一个关于多线程同步的示例。该示例工作正常。但是当我稍微添加一行来创建同一类的另一个线程时,这不起作用。有人可以让我知道为什么会这样。示例如下。下面的代码是生产者和消费者的经典例子。如果有一个生产者,当我有 2 个生产者时它工作正常,那么它将失败。它直到 15 点才停止。

class Q {

    int n;
    boolean valueSet = false;

    synchronized int get() {
        while (!valueSet) {
            try {
                wait();
            } catch (InterruptedException e) {
                System.out.println("InterruptedException caught");
            }
        }
        System.out.println("Got: " + n);
        valueSet = false;
        notify();
        return n;
    }

    synchronized void put(int n) {
        while (valueSet) {
            try {
                wait();
            } catch (InterruptedException e) {
                System.out.println("InterruptedException caught");
            }
        }
        this.n = n;
        valueSet = true;
        System.out.println("Put: " + n);
        notify();
    }
}

class Producer implements Runnable {

    Q q;

    Producer(Q q) {
        this.q = q;
        new Thread(this, "Producer").start();
        //new Thread(this, "Producer2").start();
    }

    public void run() {
        int i = 0;
        while (true) {
            q.put(i++);
        }
    }
}

class Consumer implements Runnable {

    Q q;

    Consumer(Q q) {
        this.q = q;
        new Thread(this, "Consumer").start();
    }

    @Override
    public void run() {
        while (true) {
            q.get();
        }
    }
}

public class PCFixed {

    public static void main(String[] args) {
        Q q = new Q();
        Producer P1 = new Producer(q);
        new Consumer(q);
        Producer P2 = new Producer(q);
        System.out.println("Press Control-C to stop.");
    }
}
4

3 回答 3

1

Q 被写入一次只接受一个值。您需要更改put为布尔方法 - 如果 valueset 为 true 则返回 true 然后正常进行,如果 valueset 为 false 则返回 false 并且不执行任何操作返回。然后调用的方法put将需要不断重试,直到它们得到真正的响应。这样,多个消费者可以使用同一个 Q 对象而不会相互干扰。

如果您使用多个生产者,更好的解决方案是使用ConcurrentLinkedQueue,它是一个线程安全队列。生产者将offer整数加入队列,消费者将poll整数加入队列。多个生产者可以同时offer整数互不干扰,多个消费者可以同时poll整数互不干扰。

于 2013-04-12T23:35:02.630 回答
0

用 替换每个出现notifynotifyAll

于 2013-05-15T13:39:51.170 回答
0

您提供的并发示例使用单个boolean标志来检查是否有信号。

所以这更像是一种Semaphore安排,而不是生产者消费者安排。处理任意数量的Threads 太简单了。

如果您真的想使用生产者消费者,您将需要一个包含多个项目的队列。

static final AtomicBoolean run = new AtomicBoolean(true);

static class Producer implements Runnable {

    final BlockingQueue<String> blockingQueue;

    public Producer(BlockingQueue<String> blockingQueue) {
        this.blockingQueue = blockingQueue;
    }

    @Override
    public void run() {
        while (run.get()) {
            blockingQueue.add("Value from " + Thread.currentThread().getName());
            try {
                Thread.sleep(100);
            } catch (InterruptedException ex) {
                //doesn't matter.
            }
        }
    }
}

static class Consumer implements Runnable {

    final BlockingQueue<String> blockingQueue;

    public Consumer(BlockingQueue<String> blockingQueue) {
        this.blockingQueue = blockingQueue;
    }

    @Override
    public void run() {
        while (run.get()) {
            final String item;
            try {
                item = blockingQueue.take();
            } catch (InterruptedException ex) {
                return;
            }
            System.out.println(item);
        }
    }
}

public static void main(String[] args) throws InterruptedException {
    final LinkedBlockingQueue<String> lbq = new LinkedBlockingQueue<>();
    final ExecutorService executorService = Executors.newCachedThreadPool();
    executorService.submit(new Consumer(lbq));
    for (int i = 0; i < 10; ++i) {
        executorService.submit(new Producer(lbq));
    }
    Thread.sleep(10000);
    run.set(false);
    executorService.shutdownNow();
}

这个简单的例子使用 aLinkedBlockingQueue来发布事件和从中读取事件。

ProducerputsString以自己的名字进入队列(Thread它们每 100 毫秒执行一次)。Consumer从队列中取出并打印String.

队列是一个BlockingQueue,因此take如果队列为空,该方法将阻塞。

您可以通过更改将项目添加Producer到. 实验,看看它是如何工作的。ConsumerExecutorService

AtomicBoolean标志允许程序关闭所有产生的子进程。

于 2013-04-13T00:04:24.907 回答