0

我想要一个由生产者线程填充的共享集合类,输出由消费者线程显示。它有时使用集合类的 0 元素,但永远不会更进一步。在 Eclipse 中,我在应用程序冻结后观察“DestroyJVM”线程。生产者有人工延迟来模拟“慢”生产者。我不知道为什么应用程序没有按顺序工作,例如 “生产者获取集合类的锁,添加整数,消费者等待,生产者释放锁,消费者获取锁,消费者打印,消费者释放锁,生产者获取...... “等等。谁能指出错误在哪里?

这是我的代码:

import java.util.ArrayList;
import java.util.List;

import static java.lang.System.out;

public class SyncOwnCollMain {

    public static void main(String[] args) {
        SharedIntegers ints = new SharedIntegers();
        Producer producer = new Producer();
        Consumer consumer = new Consumer();
        producer.setInts(ints);
        consumer.setInts(ints);
        Thread producerThread = new Thread(producer);
        producerThread.setName("ProducerThread");
        Thread consumerThread = new Thread(consumer);
        consumerThread.setName("ConsumerThread");

        producerThread.start();
        consumerThread.start();
    }

}

class SharedIntegers {
    private final List<Integer> ints = new ArrayList<Integer>();
    private final int max = 100;

    public synchronized void addAtPosition(int i, Integer integer) {
            ints.add(i, integer);
    }
    public synchronized Integer getAtPosition(int i) {
            return ints.get(i);
    }
    public synchronized Integer removeAtPosition(int i) {
            return ints.remove(i);
    }
    public synchronized Integer getSize() {
            return ints.size();
    }
    public synchronized boolean isFinished() {
            return max < ints.size();
    }
}

class Producer implements Runnable {
    private SharedIntegers ints;
    private int timeout = 100;

    public SharedIntegers getInts() {
            return ints;
    }

    public void setInts(SharedIntegers ints) {
            this.ints = ints;
    }

    @Override
    public void run() {
        out.println("Started ProducerThread");
        if (getInts() != null) {
            int i = 0;
            Integer integer = null;
            while (!getInts().isFinished()) {
                synchronized (getInts()) {
                    integer = i * 3;
                    getInts().addAtPosition(i, integer);
                    out.print("Producer added new integer = " + integer + " at " + i + " position");
                    out.println(". Will sleep now for " + timeout + " ms");
                    try {
                        Thread.sleep(timeout);
                        getInts().wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    i++;
                }
            }
        }
    }
}

class Consumer implements Runnable {
    private SharedIntegers ints;

    public SharedIntegers getInts() {
            return ints;
    }

    public void setInts(SharedIntegers ints) {
            this.ints = ints;
    }

    @Override
    public void run() {
        out.println("Started ConsumerThread");
        if (getInts() != null && getInts().getSize() > 0) {
            int i = 0;
            while (!getInts().isFinished()) {
                synchronized (getInts()) {
                    showAtPosition(i, getInts());
                    i++;
                    try {
                        getInts().wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    Thread.yield();
                }
            }
        } else {
            Thread.yield();
        }
    }

    private void showAtPosition(int position, SharedIntegers ints) {
            out.println("sharedInts[" + position + "] -> " + ints.getAtPosition(position));
    }
}

编辑:我设法重写代码,以便它以所需的方式工作,但是,producerThread 和 consumerThread 不会正常退出。任何想法为什么?

import java.util.ArrayList;
import java.util.List;

import static java.lang.System.out;

public class SyncOwnCollMain {

    public static void main(String[] args) {
        out.println("Main application started");
        SharedIntegers ints = new SharedIntegers();
        Producer producer = new Producer();
        Consumer consumer = new Consumer();
        producer.setInts(ints);
        consumer.setInts(ints);
        Thread producerThread = new Thread(producer);
        producerThread.setName("ProducerThread");
        Thread consumerThread = new Thread(consumer);
        consumerThread.setName("ConsumerThread");

        consumerThread.start();
        try {
            Thread.sleep(1000); // simulate that consumerThread is "anxious" to start
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        producerThread.start();
        try {
            consumerThread.join(); //let consumerThread finish before main()
            producerThread.join(); //let producerThread finish before main()
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        out.println("Main application finished");
    }

}

class SharedIntegers {
    private final List<Integer> ints = new ArrayList<Integer>();
    private final int max = 5;

    public synchronized void addAtPosition(int i, Integer integer) {
            ints.add(i, integer);
    }
    public synchronized Integer getAtPosition(int i) {
            return ints.get(i);
    }
    public synchronized Integer removeAtPosition(int i) {
            return ints.remove(i);
    }
    public synchronized Integer getSize() {
            return ints.size();
    }
    public synchronized boolean isFinished() {
            return max <= ints.size();
    }
    public synchronized boolean overflow(int i) {
            return i >= max;
    }
}

class Producer implements Runnable {
    private SharedIntegers ints;
    private final int timeout = 500;

    public SharedIntegers getInts() {
            return ints;
    }

    public void setInts(SharedIntegers ints) {
            this.ints = ints;
    }

    @Override
    public void run() {
        out.println("Started ProducerThread");
        if (getInts() != null) {
            int i = 0;
            Integer integer = null;
            synchronized (getInts()) {
                while (!getInts().isFinished()) {
                    integer = i * 3;
                    getInts().addAtPosition(i, integer);
                    out.print("Producer added new integer = " + integer + " at " + i + " position");
                    out.println(". Will sleep now for " + timeout + " ms");
                    try {
                        getInts().notify();
                        getInts().wait();
                        Thread.sleep(timeout); // simulate "slow" producer
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    i++;
                }
                try {
                    getInts().wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        out.println("Finished ProducerThread");
    }
}

class Consumer implements Runnable {
    private SharedIntegers ints;

    public SharedIntegers getInts() {
            return ints;
    }

    public void setInts(SharedIntegers ints) {
            this.ints = ints;
    }

    @Override
    public void run() {
        out.println("Started ConsumerThread");
        if (getInts() != null) {
            synchronized (getInts()) {
                int i = 0;
                while (!getInts().overflow(i)) {
                    if (getInts().getSize() > 0) {
                        showAtPosition(i, getInts());
                        i++;
                    }
                    try {
                        getInts().notify();
                        getInts().wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
        out.println("Finished ConsumerThread");
    }

    private void showAtPosition(int position, SharedIntegers ints) {
            out.println("sharedInts[" + position + "] -> " + ints.getAtPosition(position));
    }
}

编辑 2:找到解决方案:需要从 producerThread 通知 consumerThread 可以重新获取 getInts() 锁。带有我评论的工作代码如下所示(添加了一些由 consumerThread 修改的数据):

import java.util.ArrayList;
import java.util.List;

import static java.lang.System.out;

public class SyncOwnCollMain {

    public static void main(String[] args) {
        out.println("Main application started");
        SharedIntegers ints = new SharedIntegers();
        Producer producer = new Producer();
        Consumer consumer = new Consumer();
        producer.setInts(ints);
        consumer.setInts(ints);
        Thread producerThread = new Thread(producer);
        producerThread.setName("ProducerThread");
        Thread consumerThread = new Thread(consumer);
        consumerThread.setName("ConsumerThread");

        consumerThread.start();
        try {
            Thread.sleep(1000); // simulate that consumerThread is "anxious" to start
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        producerThread.start();
        try {
            consumerThread.join(); //let consumerThread finish before main()
            producerThread.join(); //let producerThread finish before main()
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        out.println("Main application finished"); // here, main() thread has result produced by producerThread and consumerThread
    }

}

class SharedIntegers {
    private final List<Integer> ints = new ArrayList<Integer>();
    private final int max = 5;

    public synchronized void addAtPosition(int i, Integer integer) {
            ints.add(i, integer);
    }
    public synchronized Integer getAtPosition(int i) {
            return ints.get(i);
    }
    public synchronized Integer removeAtPosition(int i) {
            return ints.remove(i);
    }
    public synchronized Integer getSize() {
            return ints.size();
    }
    public synchronized boolean isFinished() {
            return max <= ints.size();
    }
    public synchronized boolean overflow(int i) {
            return i >= max;
    }
}

class Producer implements Runnable {
    private SharedIntegers ints;
    private final int timeout = 500;

    public SharedIntegers getInts() {
            return ints;
    }

    public void setInts(SharedIntegers ints) {
            this.ints = ints;
    }

    @Override
    public void run() {
        out.println("Started ProducerThread");
        if (getInts() != null) {
            int i = 0;
            Integer integer = null;
            synchronized (getInts()) {
                while (!getInts().isFinished()) {
                    integer = i * 3;
                    getInts().addAtPosition(i, integer);
                    out.print("Producer added new integer = " + integer + " at " + i + " position");
                    out.println(". Will sleep now for " + timeout + " ms");
                    try {
                        getInts().notifyAll(); // notify all threads (in this case - consumer thread) that getInts() will be available for other threads to sync and other threads are legitimate to compete for locking getInts()
                        getInts().wait(); // release lock for getInts()
                        Thread.sleep(timeout); // simulate "slow" producer
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    i++;
                }
                out.println("Finished ProducerThread while() loop");
                getInts().notifyAll(); // after job is done, need to notify consumer thread that it can compete to obtain getInts() lock
            }
        }   
    }
}

class Consumer implements Runnable {
    private SharedIntegers ints;

    public SharedIntegers getInts() {
            return ints;
    }

    public void setInts(SharedIntegers ints) {
            this.ints = ints;
    }

    @Override
    public void run() {
        out.println("Started ConsumerThread");
        if (getInts() != null) {
            int i = 0;
            synchronized (getInts()) {
                while (!getInts().overflow(i)) {
                    if (getInts().getSize() > 0) {
                        out.println(showAtPosition(i, getInts()));
                        increaseAtPosition(i, getInts());
                        out.println("After consumer increase : " + showAtPosition(i, getInts()));
                        i++;
                    }
                    try {
                        getInts().notifyAll(); // notify all threads that other threads are legitimate to compete for getInts() lock
                        getInts().wait(); // release getInts() lock, wait for allowance notification
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                out.println("Finished ConsumerThread while() loop");
            }
        }
    }

    private String showAtPosition(int position, SharedIntegers ints) {
            return "sharedInts[" + position + "] -> " + ints.getAtPosition(position);
    }

    private void increaseAtPosition(int position, SharedIntegers ints) {
        Integer increased = ints.getAtPosition(position)+1;
        ints.removeAtPosition(position);
        ints.addAtPosition(position, increased);
    }
}
4

2 回答 2

0

在里面Producer,改变

getInts().wait()

getInts().notify()

于 2015-07-28T16:41:57.603 回答
0

您的调用会getInts().wait();导致每个 Thread 永远等待,因为您从不调用notify(),因此您的应用程序会冻结。请参阅 Javadoc 了解java.lang.Object.wait()java.lang.Object.notify()

于 2015-07-28T15:06:46.080 回答