2

我有一个共享缓冲区的生产者和消费者。我希望能够运行、暂停、继续运行和停止线程。

我尝试的是保留一个 Enum 标志来指示状态,每次我生产一个新项目时,我都会在 if-else 中检查状态。如果状态正在运行,我继续运行,如果它正在等待,我让线程等待。这很有效,因为生产者和消费者总是在工作(即生产者可以将项目放入缓冲区,而消费者总是可以在缓冲区中找到项目)。但是,一旦我遇到其中一个线程因为缓冲区已满或为空而相互等待的问题,那么程序的整个逻辑就会被搞砸,我完全无法解决它。我已经为此工作了4天,但仍然没有希望。如果有人能帮助我解决这个问题,我将非常感激。谢谢!

要运行、暂停、继续运行和停止线程,我使用的是 GUI。它通过 pause() 将线程置于等待状态......然后一旦我想再次运行它们,我就唤醒()线程并通知它们。与 die() 相同。

编辑: 到目前为止的逻辑问题是,当我单击按钮继续时,线程的内部状态仍然是“WAITING”,而它们应该是“RUNNING”。这就是 GUI 阻塞的原因。

这是我的生产者代码:

public class GProducer2 implements Runnable {


    private volatile ThreadState state;
    private volatile ThreadState innerState;
    private BlockingQueue<Integer> buffer;

    private static Queue<Integer> stream = new LinkedList<Integer>();
    static {
        for ( int i = 0; i <= 1000; i++ ) {
            stream.add(i);
        }
    }

    public GProducer2( BlockingQueue<Integer> buffer ) {
        this.buffer = buffer;
        state = ThreadState.RUNNING;
//      innerState = ThreadState.RUNNING;
    }

    @Override
    public void run() {
        /*
         * The first while loop is to keep getting items from the stream
         */
        while( state != ThreadState.DYING ) {
            if ( !stream.isEmpty() ) {
                int item = stream.poll();
                /*
                 * The second while loop is to not lose items if the 
                 * thread has to wait, so it process the item when the thread
                 * is running again.
                 */
                while( state != ThreadState.DYING ) {

                    if ( state == ThreadState.RUNNING ) {
                        //Check to see if buffer has free space
                        boolean freeBuffer = false;
                        synchronized (buffer) {
                            freeBuffer = buffer.offer(item);
                        }

                        while ( (!freeBuffer) && (state == ThreadState.RUNNING)) {
                            //if it doesn't, then wait...
                            synchronized (this) {
                                try {
                                    innerState = ThreadState.WAITING;
                                    wait(100);
                                } catch (InterruptedException e) {
                                    //e.printStackTrace();
                                }
                            }
                            //check to see if the buffer has free space now
                            synchronized (buffer) {
                                freeBuffer = buffer.offer(item);
                            }
                        }

                        if ( (freeBuffer) && (state == ThreadState.RUNNING) ) {
                            synchronized (this) {
                                innerState = ThreadState.RUNNING;
                            }
                            //... continue with the stuff if you need
                            //...
                            //System.out.println(item);
                            //..then break
                            break;
                        }

                    }
                    else if ( state == ThreadState.WAITING ) {
                        synchronized (this) {
                            try {
                                innerState = ThreadState.WAITING;
                                wait();
                            } catch (InterruptedException e) {
                                //e.printStackTrace();
                                //innerState = ThreadState.RUNNING;
                            }
                        }
                    }
                }
            }//when the stream is done.
            else if ( state == ThreadState.WAITING ) {
                synchronized (this) {
                    try {
                        innerState = ThreadState.WAITING;
                        wait();
                    } catch (InterruptedException e) {
                        //e.printStackTrace();
                        if ( state == ThreadState.WAITING )
                            innerState = ThreadState.RUNNING;
                        else
                            innerState = ThreadState.DYING;
                    }
                }
            }
        }

        synchronized (this) {
            innerState = ThreadState.DYING;
        }
    }

    public void pause() {
        synchronized (this) {
            state = ThreadState.WAITING;
        }
    }

    public void die() {
        synchronized (this) {
            state = ThreadState.DYING;
        }
    }

    public void wakeup() {
        synchronized (this) {
            state = ThreadState.RUNNING;
        }
    }

    public ThreadState getState() {
        return state;
    }


    public ThreadState getInnerState() {
        return innerState;
    }

    public boolean isSynched() {
        synchronized (this) {
            if ( state == innerState )
                return true;
            else 
                return false;
        }
    }

}

这是我的消费者的代码:

public class GConsumer implements Runnable {

    private volatile ThreadState state;
    private volatile ThreadState innerState;
    private BlockingQueue<Integer> buffer;
    private List<Integer> holder;

    public GConsumer( BlockingQueue<Integer> buffer ) {
        this.buffer = buffer;
        state = ThreadState.RUNNING;
        holder = new LinkedList<Integer>();
    }

    @Override
    public void run() {
        /*
         * The first while loop is to keep getting items from the buffer
         */
        while( state != ThreadState.DYING ) {
            if ( state == ThreadState.RUNNING ) {
                //if the buffer has items then process them
                boolean emptyBuffer = true;
                synchronized (buffer) {
                    emptyBuffer = buffer.isEmpty();
                }

                if ( !emptyBuffer ) {
                    //Start doing your stuff
                    innerState = ThreadState.RUNNING;
                    int item;
                    synchronized (buffer) {
                        item = buffer.poll();
                    }

                    holder.add(item);

                }
                //otherwise the thread waits for the buffer to get items
                else {
                    synchronized (this) {
                        try {
                            innerState = ThreadState.WAITING;
                            wait(100);
                        } catch (InterruptedException e) {
                            //e.printStackTrace();
                        }
                    }
                }

            }
            else if ( state == ThreadState.WAITING ) {
                synchronized (this) {
                    try {
                        innerState = ThreadState.WAITING;
                        wait();
                    } catch (InterruptedException e) {
                        //e.printStackTrace();
                    }
                }
            }
        }

        synchronized (this) {
            innerState = ThreadState.DYING;
        }

    }

    public void pause() {
        synchronized (this) {
            state = ThreadState.WAITING;
        }
    }

    public void die() {
        synchronized (this) {
            state = ThreadState.DYING;
        }
    }

    public void wakeup() {
        synchronized (this) {
            state = ThreadState.RUNNING;
        }
    }

    public ThreadState getState() {
        return state;
    }

    public synchronized List<Integer> getHolder() {
        return holder;
    }

    public ThreadState getInnerState() {
        return innerState;
    }

    public boolean isSynched() {
        synchronized (this) {
            if ( state == innerState )
                return true;
            else 
                return false;
        }
    }
}

这是我的 GUI 的代码:

public class GController implements ActionListener, ItemListener {

    ExecutorService executor = Executors.newCachedThreadPool();
    private final BlockingQueue<Integer> buffer = new LinkedBlockingQueue<Integer>(10);
    private volatile AppState appState = AppState.CLEAN_START;

    private GProducer2 producer;
    private GConsumer consumer;

    //GUI stuff
    static JToggleButton startBtn;
    static JButton stopBtn;
    static JButton showBtn;

    public static void main(String[] args) {
        SwingUtilities.invokeLater(new Runnable() {
            @Override
            public void run() {
                createAndShowGUI();
            }
        });
    }

    public GController() {
//      producer = new GProducer2(buffer);
//      consumer = new GConsumer(buffer);
    }

    private static void createAndShowGUI() {
        GController gController = new GController();
        JFrame frame = new JFrame("GUI Concurrency");
        frame.setPreferredSize( new Dimension(400, 200));
        frame.setLayout( new FlowLayout() );
        frame.setDefaultCloseOperation( JFrame.EXIT_ON_CLOSE );

        startBtn = new JToggleButton("Start");
        startBtn.addItemListener(gController);

        stopBtn = new JButton("Cancel");
        stopBtn.setEnabled(false);
        stopBtn.setActionCommand("Cancel");
        stopBtn.addActionListener(gController);

        showBtn = new JButton("Show");
        showBtn.setActionCommand("Show");
        showBtn.addActionListener(gController);

        frame.getContentPane().add(startBtn);
        frame.getContentPane().add(stopBtn);
        frame.getContentPane().add(showBtn);

        frame.pack();
        frame.setVisible(true);
    }


    @Override
    public void actionPerformed(ActionEvent e) {
        String command = e.getActionCommand();
        System.out.println(command + " is clicked");

        if ( command.equals("Cancel") ) {
            startBtn.setText("Start");
            appState = AppState.CLEAN_START;

            producer.die();
            consumer.die();

            synchronized (producer) {
                producer.notify();
            }
            System.out.println( "P:" + producer.getState()  );
            System.out.println( "P inner:" + producer.getInnerState()  );

            synchronized (consumer) {
                consumer.notify();
            }
            System.out.println( "C:" + consumer.getState()  );
            System.out.println( "C inner:" + consumer.getInnerState()  );

            //Block here until they are both dead;

            consumer.getHolder().clear();
            executor.shutdown();
        }
        else if ( command.equals("Show") ) {
            for ( int i : consumer.getHolder() ) {
                System.out.println("[" + i + "]");
            }
            System.out.println();
        }

    }

    @Override
    public void itemStateChanged(ItemEvent e) {
        if ( e.getStateChange() == ItemEvent.SELECTED ) {

            if ( appState == AppState.CLEAN_START) {
                System.out.println("Start");
                startBtn.setText("Pause");
                appState = AppState.RUNNING;
                executor = Executors.newCachedThreadPool();
                producer = new GProducer2(buffer);
                consumer = new GConsumer(buffer);
                executor.execute( producer );
                executor.execute( consumer );
                executor.shutdown();
                stopBtn.setEnabled(false);
            }
            //Now continue execution
            else if ( appState == AppState.PAUSED ) {
                System.out.println("Continue");
                appState = AppState.RUNNING;
                producer.wakeup();
                synchronized (producer) {
                    producer.notify();
                }

                System.out.println( "P:" + producer.getState()  );
                System.out.println( "P inner:" + producer.getInnerState()  );

                consumer.wakeup();
                synchronized (consumer) {
                    consumer.notify();
                }

                System.out.println( "C:" + consumer.getState()  );
                System.out.println( "C inner:" + consumer.getInnerState()  );

                //block the app here until they are really running
//              while( !producer.isSynched() ) {
//              }

                while( !producer.isSynched() | !consumer.isSynched() ) {
                    System.out.println( "P:" + producer.getState()  );
                    System.out.println( "P inner:" + producer.getInnerState()  );

                    System.out.println( "C:" + consumer.getState()  );
                    System.out.println( "C inner:" + consumer.getInnerState()  );
                }

                startBtn.setText("Pause");
                stopBtn.setEnabled(false);
            }
        }
        else {
            System.out.println("Pause");
            startBtn.setText("Continue");
            appState = AppState.PAUSED;

            System.out.println("Before:");

            System.out.println( "P:" + producer.getState()  );
            System.out.println( "P inner:" + producer.getInnerState()  );
            System.out.println( "C:" + consumer.getState()  );
            System.out.println( "C inner:" + consumer.getInnerState()  );

            producer.pause();
            consumer.pause();

            //Block the app here until they are really waiting
            System.out.println("After:");
            System.out.println( "P:" + producer.getState()  );
            System.out.println( "P inner:" + producer.getInnerState()  );
            System.out.println( "C:" + consumer.getState()  );
            System.out.println( "C inner:" + consumer.getInnerState()  );
            while( !producer.isSynched() | !consumer.isSynched() ) {
            }
            stopBtn.setEnabled(true);
        }
    }

}
4

1 回答 1

0

这里的代码太多了,但我可以看到同步原语肯定没有正确使用(比如在唤醒后不检查条件,声明一个字段volatile并用它来保护它synchronized等)。

众所周知,同步原语很难正确使用,如果我是你,我会使用更高级别的构造,而不是尝试修复当前的方法。

我不确定您要完成什么,但我想您想“运行”、“暂停”和“停止”处理。要实现“运行”,请使用简单的标准生产者消费者构造和有界队列(参见此示例)。要实现“暂停”,请让您的任务引用共享Semaphore并在您的任务调用内部acquire,如下Semaphore所示:

final Semaphore signal = new Semaphore(1);
final ExecutionService service = // create one with a bounded queue
                                 // and a CallerRuns policy

void submit(final Callable<T> work){
     Callable<T> wrapped = new Callable<T>(){
         public T call() throws Exception(){
             signal.acquire();
             return work.call();
         }
     };
     service.submit(wrapped);
}

*:代码仅用于说明目的,并不意味着编译;但你会明白的

然后您可以通过在drainPermits. 为了“停止”,只需调用releasesignalshutdownExecutionService

于 2013-10-05T14:47:06.093 回答