我想要一个由生产者线程填充的共享集合类,输出由消费者线程显示。它有时使用集合类的 0 元素,但永远不会更进一步。在 Eclipse 中,我在应用程序冻结后观察“DestroyJVM”线程。生产者有人工延迟来模拟“慢”生产者。我不知道为什么应用程序没有按顺序工作,例如 “生产者获取集合类的锁,添加整数,消费者等待,生产者释放锁,消费者获取锁,消费者打印,消费者释放锁,生产者获取...... “等等。谁能指出错误在哪里?
这是我的代码:
import java.util.ArrayList;
import java.util.List;
import static java.lang.System.out;
public class SyncOwnCollMain {
public static void main(String[] args) {
SharedIntegers ints = new SharedIntegers();
Producer producer = new Producer();
Consumer consumer = new Consumer();
producer.setInts(ints);
consumer.setInts(ints);
Thread producerThread = new Thread(producer);
producerThread.setName("ProducerThread");
Thread consumerThread = new Thread(consumer);
consumerThread.setName("ConsumerThread");
producerThread.start();
consumerThread.start();
}
}
class SharedIntegers {
private final List<Integer> ints = new ArrayList<Integer>();
private final int max = 100;
public synchronized void addAtPosition(int i, Integer integer) {
ints.add(i, integer);
}
public synchronized Integer getAtPosition(int i) {
return ints.get(i);
}
public synchronized Integer removeAtPosition(int i) {
return ints.remove(i);
}
public synchronized Integer getSize() {
return ints.size();
}
public synchronized boolean isFinished() {
return max < ints.size();
}
}
class Producer implements Runnable {
private SharedIntegers ints;
private int timeout = 100;
public SharedIntegers getInts() {
return ints;
}
public void setInts(SharedIntegers ints) {
this.ints = ints;
}
@Override
public void run() {
out.println("Started ProducerThread");
if (getInts() != null) {
int i = 0;
Integer integer = null;
while (!getInts().isFinished()) {
synchronized (getInts()) {
integer = i * 3;
getInts().addAtPosition(i, integer);
out.print("Producer added new integer = " + integer + " at " + i + " position");
out.println(". Will sleep now for " + timeout + " ms");
try {
Thread.sleep(timeout);
getInts().wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
i++;
}
}
}
}
}
class Consumer implements Runnable {
private SharedIntegers ints;
public SharedIntegers getInts() {
return ints;
}
public void setInts(SharedIntegers ints) {
this.ints = ints;
}
@Override
public void run() {
out.println("Started ConsumerThread");
if (getInts() != null && getInts().getSize() > 0) {
int i = 0;
while (!getInts().isFinished()) {
synchronized (getInts()) {
showAtPosition(i, getInts());
i++;
try {
getInts().wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
Thread.yield();
}
}
} else {
Thread.yield();
}
}
private void showAtPosition(int position, SharedIntegers ints) {
out.println("sharedInts[" + position + "] -> " + ints.getAtPosition(position));
}
}
编辑:我设法重写代码,以便它以所需的方式工作,但是,producerThread 和 consumerThread 不会正常退出。任何想法为什么?
import java.util.ArrayList;
import java.util.List;
import static java.lang.System.out;
public class SyncOwnCollMain {
public static void main(String[] args) {
out.println("Main application started");
SharedIntegers ints = new SharedIntegers();
Producer producer = new Producer();
Consumer consumer = new Consumer();
producer.setInts(ints);
consumer.setInts(ints);
Thread producerThread = new Thread(producer);
producerThread.setName("ProducerThread");
Thread consumerThread = new Thread(consumer);
consumerThread.setName("ConsumerThread");
consumerThread.start();
try {
Thread.sleep(1000); // simulate that consumerThread is "anxious" to start
} catch (InterruptedException e) {
e.printStackTrace();
}
producerThread.start();
try {
consumerThread.join(); //let consumerThread finish before main()
producerThread.join(); //let producerThread finish before main()
} catch (InterruptedException e) {
e.printStackTrace();
}
out.println("Main application finished");
}
}
class SharedIntegers {
private final List<Integer> ints = new ArrayList<Integer>();
private final int max = 5;
public synchronized void addAtPosition(int i, Integer integer) {
ints.add(i, integer);
}
public synchronized Integer getAtPosition(int i) {
return ints.get(i);
}
public synchronized Integer removeAtPosition(int i) {
return ints.remove(i);
}
public synchronized Integer getSize() {
return ints.size();
}
public synchronized boolean isFinished() {
return max <= ints.size();
}
public synchronized boolean overflow(int i) {
return i >= max;
}
}
class Producer implements Runnable {
private SharedIntegers ints;
private final int timeout = 500;
public SharedIntegers getInts() {
return ints;
}
public void setInts(SharedIntegers ints) {
this.ints = ints;
}
@Override
public void run() {
out.println("Started ProducerThread");
if (getInts() != null) {
int i = 0;
Integer integer = null;
synchronized (getInts()) {
while (!getInts().isFinished()) {
integer = i * 3;
getInts().addAtPosition(i, integer);
out.print("Producer added new integer = " + integer + " at " + i + " position");
out.println(". Will sleep now for " + timeout + " ms");
try {
getInts().notify();
getInts().wait();
Thread.sleep(timeout); // simulate "slow" producer
} catch (InterruptedException e) {
e.printStackTrace();
}
i++;
}
try {
getInts().wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
out.println("Finished ProducerThread");
}
}
class Consumer implements Runnable {
private SharedIntegers ints;
public SharedIntegers getInts() {
return ints;
}
public void setInts(SharedIntegers ints) {
this.ints = ints;
}
@Override
public void run() {
out.println("Started ConsumerThread");
if (getInts() != null) {
synchronized (getInts()) {
int i = 0;
while (!getInts().overflow(i)) {
if (getInts().getSize() > 0) {
showAtPosition(i, getInts());
i++;
}
try {
getInts().notify();
getInts().wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
out.println("Finished ConsumerThread");
}
private void showAtPosition(int position, SharedIntegers ints) {
out.println("sharedInts[" + position + "] -> " + ints.getAtPosition(position));
}
}
编辑 2:找到解决方案:需要从 producerThread 通知 consumerThread 可以重新获取 getInts() 锁。带有我评论的工作代码如下所示(添加了一些由 consumerThread 修改的数据):
import java.util.ArrayList;
import java.util.List;
import static java.lang.System.out;
public class SyncOwnCollMain {
public static void main(String[] args) {
out.println("Main application started");
SharedIntegers ints = new SharedIntegers();
Producer producer = new Producer();
Consumer consumer = new Consumer();
producer.setInts(ints);
consumer.setInts(ints);
Thread producerThread = new Thread(producer);
producerThread.setName("ProducerThread");
Thread consumerThread = new Thread(consumer);
consumerThread.setName("ConsumerThread");
consumerThread.start();
try {
Thread.sleep(1000); // simulate that consumerThread is "anxious" to start
} catch (InterruptedException e) {
e.printStackTrace();
}
producerThread.start();
try {
consumerThread.join(); //let consumerThread finish before main()
producerThread.join(); //let producerThread finish before main()
} catch (InterruptedException e) {
e.printStackTrace();
}
out.println("Main application finished"); // here, main() thread has result produced by producerThread and consumerThread
}
}
class SharedIntegers {
private final List<Integer> ints = new ArrayList<Integer>();
private final int max = 5;
public synchronized void addAtPosition(int i, Integer integer) {
ints.add(i, integer);
}
public synchronized Integer getAtPosition(int i) {
return ints.get(i);
}
public synchronized Integer removeAtPosition(int i) {
return ints.remove(i);
}
public synchronized Integer getSize() {
return ints.size();
}
public synchronized boolean isFinished() {
return max <= ints.size();
}
public synchronized boolean overflow(int i) {
return i >= max;
}
}
class Producer implements Runnable {
private SharedIntegers ints;
private final int timeout = 500;
public SharedIntegers getInts() {
return ints;
}
public void setInts(SharedIntegers ints) {
this.ints = ints;
}
@Override
public void run() {
out.println("Started ProducerThread");
if (getInts() != null) {
int i = 0;
Integer integer = null;
synchronized (getInts()) {
while (!getInts().isFinished()) {
integer = i * 3;
getInts().addAtPosition(i, integer);
out.print("Producer added new integer = " + integer + " at " + i + " position");
out.println(". Will sleep now for " + timeout + " ms");
try {
getInts().notifyAll(); // notify all threads (in this case - consumer thread) that getInts() will be available for other threads to sync and other threads are legitimate to compete for locking getInts()
getInts().wait(); // release lock for getInts()
Thread.sleep(timeout); // simulate "slow" producer
} catch (InterruptedException e) {
e.printStackTrace();
}
i++;
}
out.println("Finished ProducerThread while() loop");
getInts().notifyAll(); // after job is done, need to notify consumer thread that it can compete to obtain getInts() lock
}
}
}
}
class Consumer implements Runnable {
private SharedIntegers ints;
public SharedIntegers getInts() {
return ints;
}
public void setInts(SharedIntegers ints) {
this.ints = ints;
}
@Override
public void run() {
out.println("Started ConsumerThread");
if (getInts() != null) {
int i = 0;
synchronized (getInts()) {
while (!getInts().overflow(i)) {
if (getInts().getSize() > 0) {
out.println(showAtPosition(i, getInts()));
increaseAtPosition(i, getInts());
out.println("After consumer increase : " + showAtPosition(i, getInts()));
i++;
}
try {
getInts().notifyAll(); // notify all threads that other threads are legitimate to compete for getInts() lock
getInts().wait(); // release getInts() lock, wait for allowance notification
} catch (InterruptedException e) {
e.printStackTrace();
}
}
out.println("Finished ConsumerThread while() loop");
}
}
}
private String showAtPosition(int position, SharedIntegers ints) {
return "sharedInts[" + position + "] -> " + ints.getAtPosition(position);
}
private void increaseAtPosition(int position, SharedIntegers ints) {
Integer increased = ints.getAtPosition(position)+1;
ints.removeAtPosition(position);
ints.addAtPosition(position, increased);
}
}