我有一个共享缓冲区的生产者和消费者。我希望能够运行、暂停、继续运行和停止线程。
我尝试的是保留一个 Enum 标志来指示状态,每次我生产一个新项目时,我都会在 if-else 中检查状态。如果状态正在运行,我继续运行,如果它正在等待,我让线程等待。这很有效,因为生产者和消费者总是在工作(即生产者可以将项目放入缓冲区,而消费者总是可以在缓冲区中找到项目)。但是,一旦我遇到其中一个线程因为缓冲区已满或为空而相互等待的问题,那么程序的整个逻辑就会被搞砸,我完全无法解决它。我已经为此工作了4天,但仍然没有希望。如果有人能帮助我解决这个问题,我将非常感激。谢谢!
要运行、暂停、继续运行和停止线程,我使用的是 GUI。它通过 pause() 将线程置于等待状态......然后一旦我想再次运行它们,我就唤醒()线程并通知它们。与 die() 相同。
编辑: 到目前为止的逻辑问题是,当我单击按钮继续时,线程的内部状态仍然是“WAITING”,而它们应该是“RUNNING”。这就是 GUI 阻塞的原因。
这是我的生产者代码:
public class GProducer2 implements Runnable {
private volatile ThreadState state;
private volatile ThreadState innerState;
private BlockingQueue<Integer> buffer;
private static Queue<Integer> stream = new LinkedList<Integer>();
static {
for ( int i = 0; i <= 1000; i++ ) {
stream.add(i);
}
}
public GProducer2( BlockingQueue<Integer> buffer ) {
this.buffer = buffer;
state = ThreadState.RUNNING;
// innerState = ThreadState.RUNNING;
}
@Override
public void run() {
/*
* The first while loop is to keep getting items from the stream
*/
while( state != ThreadState.DYING ) {
if ( !stream.isEmpty() ) {
int item = stream.poll();
/*
* The second while loop is to not lose items if the
* thread has to wait, so it process the item when the thread
* is running again.
*/
while( state != ThreadState.DYING ) {
if ( state == ThreadState.RUNNING ) {
//Check to see if buffer has free space
boolean freeBuffer = false;
synchronized (buffer) {
freeBuffer = buffer.offer(item);
}
while ( (!freeBuffer) && (state == ThreadState.RUNNING)) {
//if it doesn't, then wait...
synchronized (this) {
try {
innerState = ThreadState.WAITING;
wait(100);
} catch (InterruptedException e) {
//e.printStackTrace();
}
}
//check to see if the buffer has free space now
synchronized (buffer) {
freeBuffer = buffer.offer(item);
}
}
if ( (freeBuffer) && (state == ThreadState.RUNNING) ) {
synchronized (this) {
innerState = ThreadState.RUNNING;
}
//... continue with the stuff if you need
//...
//System.out.println(item);
//..then break
break;
}
}
else if ( state == ThreadState.WAITING ) {
synchronized (this) {
try {
innerState = ThreadState.WAITING;
wait();
} catch (InterruptedException e) {
//e.printStackTrace();
//innerState = ThreadState.RUNNING;
}
}
}
}
}//when the stream is done.
else if ( state == ThreadState.WAITING ) {
synchronized (this) {
try {
innerState = ThreadState.WAITING;
wait();
} catch (InterruptedException e) {
//e.printStackTrace();
if ( state == ThreadState.WAITING )
innerState = ThreadState.RUNNING;
else
innerState = ThreadState.DYING;
}
}
}
}
synchronized (this) {
innerState = ThreadState.DYING;
}
}
public void pause() {
synchronized (this) {
state = ThreadState.WAITING;
}
}
public void die() {
synchronized (this) {
state = ThreadState.DYING;
}
}
public void wakeup() {
synchronized (this) {
state = ThreadState.RUNNING;
}
}
public ThreadState getState() {
return state;
}
public ThreadState getInnerState() {
return innerState;
}
public boolean isSynched() {
synchronized (this) {
if ( state == innerState )
return true;
else
return false;
}
}
}
这是我的消费者的代码:
public class GConsumer implements Runnable {
private volatile ThreadState state;
private volatile ThreadState innerState;
private BlockingQueue<Integer> buffer;
private List<Integer> holder;
public GConsumer( BlockingQueue<Integer> buffer ) {
this.buffer = buffer;
state = ThreadState.RUNNING;
holder = new LinkedList<Integer>();
}
@Override
public void run() {
/*
* The first while loop is to keep getting items from the buffer
*/
while( state != ThreadState.DYING ) {
if ( state == ThreadState.RUNNING ) {
//if the buffer has items then process them
boolean emptyBuffer = true;
synchronized (buffer) {
emptyBuffer = buffer.isEmpty();
}
if ( !emptyBuffer ) {
//Start doing your stuff
innerState = ThreadState.RUNNING;
int item;
synchronized (buffer) {
item = buffer.poll();
}
holder.add(item);
}
//otherwise the thread waits for the buffer to get items
else {
synchronized (this) {
try {
innerState = ThreadState.WAITING;
wait(100);
} catch (InterruptedException e) {
//e.printStackTrace();
}
}
}
}
else if ( state == ThreadState.WAITING ) {
synchronized (this) {
try {
innerState = ThreadState.WAITING;
wait();
} catch (InterruptedException e) {
//e.printStackTrace();
}
}
}
}
synchronized (this) {
innerState = ThreadState.DYING;
}
}
public void pause() {
synchronized (this) {
state = ThreadState.WAITING;
}
}
public void die() {
synchronized (this) {
state = ThreadState.DYING;
}
}
public void wakeup() {
synchronized (this) {
state = ThreadState.RUNNING;
}
}
public ThreadState getState() {
return state;
}
public synchronized List<Integer> getHolder() {
return holder;
}
public ThreadState getInnerState() {
return innerState;
}
public boolean isSynched() {
synchronized (this) {
if ( state == innerState )
return true;
else
return false;
}
}
}
这是我的 GUI 的代码:
public class GController implements ActionListener, ItemListener {
ExecutorService executor = Executors.newCachedThreadPool();
private final BlockingQueue<Integer> buffer = new LinkedBlockingQueue<Integer>(10);
private volatile AppState appState = AppState.CLEAN_START;
private GProducer2 producer;
private GConsumer consumer;
//GUI stuff
static JToggleButton startBtn;
static JButton stopBtn;
static JButton showBtn;
public static void main(String[] args) {
SwingUtilities.invokeLater(new Runnable() {
@Override
public void run() {
createAndShowGUI();
}
});
}
public GController() {
// producer = new GProducer2(buffer);
// consumer = new GConsumer(buffer);
}
private static void createAndShowGUI() {
GController gController = new GController();
JFrame frame = new JFrame("GUI Concurrency");
frame.setPreferredSize( new Dimension(400, 200));
frame.setLayout( new FlowLayout() );
frame.setDefaultCloseOperation( JFrame.EXIT_ON_CLOSE );
startBtn = new JToggleButton("Start");
startBtn.addItemListener(gController);
stopBtn = new JButton("Cancel");
stopBtn.setEnabled(false);
stopBtn.setActionCommand("Cancel");
stopBtn.addActionListener(gController);
showBtn = new JButton("Show");
showBtn.setActionCommand("Show");
showBtn.addActionListener(gController);
frame.getContentPane().add(startBtn);
frame.getContentPane().add(stopBtn);
frame.getContentPane().add(showBtn);
frame.pack();
frame.setVisible(true);
}
@Override
public void actionPerformed(ActionEvent e) {
String command = e.getActionCommand();
System.out.println(command + " is clicked");
if ( command.equals("Cancel") ) {
startBtn.setText("Start");
appState = AppState.CLEAN_START;
producer.die();
consumer.die();
synchronized (producer) {
producer.notify();
}
System.out.println( "P:" + producer.getState() );
System.out.println( "P inner:" + producer.getInnerState() );
synchronized (consumer) {
consumer.notify();
}
System.out.println( "C:" + consumer.getState() );
System.out.println( "C inner:" + consumer.getInnerState() );
//Block here until they are both dead;
consumer.getHolder().clear();
executor.shutdown();
}
else if ( command.equals("Show") ) {
for ( int i : consumer.getHolder() ) {
System.out.println("[" + i + "]");
}
System.out.println();
}
}
@Override
public void itemStateChanged(ItemEvent e) {
if ( e.getStateChange() == ItemEvent.SELECTED ) {
if ( appState == AppState.CLEAN_START) {
System.out.println("Start");
startBtn.setText("Pause");
appState = AppState.RUNNING;
executor = Executors.newCachedThreadPool();
producer = new GProducer2(buffer);
consumer = new GConsumer(buffer);
executor.execute( producer );
executor.execute( consumer );
executor.shutdown();
stopBtn.setEnabled(false);
}
//Now continue execution
else if ( appState == AppState.PAUSED ) {
System.out.println("Continue");
appState = AppState.RUNNING;
producer.wakeup();
synchronized (producer) {
producer.notify();
}
System.out.println( "P:" + producer.getState() );
System.out.println( "P inner:" + producer.getInnerState() );
consumer.wakeup();
synchronized (consumer) {
consumer.notify();
}
System.out.println( "C:" + consumer.getState() );
System.out.println( "C inner:" + consumer.getInnerState() );
//block the app here until they are really running
// while( !producer.isSynched() ) {
// }
while( !producer.isSynched() | !consumer.isSynched() ) {
System.out.println( "P:" + producer.getState() );
System.out.println( "P inner:" + producer.getInnerState() );
System.out.println( "C:" + consumer.getState() );
System.out.println( "C inner:" + consumer.getInnerState() );
}
startBtn.setText("Pause");
stopBtn.setEnabled(false);
}
}
else {
System.out.println("Pause");
startBtn.setText("Continue");
appState = AppState.PAUSED;
System.out.println("Before:");
System.out.println( "P:" + producer.getState() );
System.out.println( "P inner:" + producer.getInnerState() );
System.out.println( "C:" + consumer.getState() );
System.out.println( "C inner:" + consumer.getInnerState() );
producer.pause();
consumer.pause();
//Block the app here until they are really waiting
System.out.println("After:");
System.out.println( "P:" + producer.getState() );
System.out.println( "P inner:" + producer.getInnerState() );
System.out.println( "C:" + consumer.getState() );
System.out.println( "C inner:" + consumer.getInnerState() );
while( !producer.isSynched() | !consumer.isSynched() ) {
}
stopBtn.setEnabled(true);
}
}
}