0

我想在Java中实现类,它将等待来自不同线程的新数据,当他得到它时,这个类将处理它并再次等待新数据。我想只使用synchronized、wait、notifyAll命令来实现这一点。我尝试了一些变体:

1) 使用一个线程,通过命令 lockObject.wait() 等待。但是当所有活动线程完成工作时,该线程将永远等待。当然,我可以创建方法 stopProcess(),但它不安全,因为另一个程序员可能会忘记调用它。

2)使用一个守护线程,它不会工作,因为当所有活动线程完成他们的工作时,我的守护线程死了,但他可以有一些他必须处理的数据

3)当新数据到来时 - 创建新线程,它将处理数据。当线程处于活动状态时(他处理给定的数据),他将接收新数据。当没有数据到来并且所有旧数据都被处理时,线程完成工作。这个变体的减号是 - 当数据通过某个时期(当线程有时间处理旧数据并死亡时),将创建一个新线程。我认为这对性能或/和内存不利。我对吗?

是否可以只使用一个或两个(可能结合使用守护进程和活动线程)线程而不使用 stopProcess() 方法来解决我的问题?

这里有一些代码

我对阻塞队列的认识

public class BlockingQueue<T> {
    private Queue<T> queue = new LinkedList<T>();

    public void add(T el){
        synchronized (queue){
            queue.add(el);
        }
    }

    public T getFirst(){
        synchronized (queue){
            return queue.poll();
        }
    }

    public int getSize(){
        synchronized (queue){
            return queue.size();
        }
    }
}

数据类

public class Data {
    //some data

    public void process(){
        //process this data
    }
} 

代码的第一个变体

public class ProcessData {

    private BlockingQueue<Data> queue = new BlockingQueue<Data>();
    private boolean run = false;
    private Thread processThread;
    private Object lock = new Object();

    public synchronized void addData(Data data) throws Exception {
        if (run){
            if (data != null){
                queue.add(data);
                wakeUpToProcess();
            }
        }else{
            throw new Exception("");
        }
    }

    public synchronized void start() {
        if (!run){
            run = true;

            processThread = new Thread(new Runnable() {
                public void run() {

                    while (run || queue.getSize()!=0){

                        while(queue.getSize() == 0 && run){
                            //if stopProcess was not called
                            //and no active threads
                            //it will not die
                            waitForNewData();
                        }

                        Data cur;
                        while(queue.getSize() > 0){
                            cur = queue.getFirst();
                            cur.process();
                        }

                    }
                }
            });
            processThread.start();

        }
    }

    public synchronized void stopProcess() {
        if (run){
            run = false;
            wakeUpToProcess();
        }
    }

    private void waitForNewData(){
        try{
            synchronized (lock){
                lock.wait();
            }
        }catch (InterruptedException ex){
            ex.printStackTrace();
        }
    }

    private void wakeUpToProcess(){
        synchronized (lock){
            lock.notifyAll();
        }
    }
}

在第二个变体中,我将 processThread 作为守护进程。但是当活动线程死亡时,processThread 完成工作,但队列中有一些数据,我必须处理。

第三种变体

public class ProcessData {

    private BlockingQueue<Data> queue = new BlockingQueue<Data>();
    private boolean run = false;
    private Thread processThread = null;

    public synchronized void addData(Data data) throws Exception {
        if (run){
            if (data != null){
                queue.add(data);
                wakeExecutor();
            }
        }else{
            throw new Exception("ProcessData is stopped!");
        }
    }

    public synchronized void start() {
        if (!run){
            run = true;
        }
    }

    public synchronized void stopProcess() {
        if (run){
            run = false;
        }
    }

    public boolean isRunning(){
        return this.run;
    }

    protected void wakeExecutor(){
        if (processThread ==null || !processThread.isAlive()){
            processThread = new Thread(new Runnable() {
                @Override
                public void run() {
                    Data cur;
                    while(queue.getSize() > 0){
                        cur = queue.getFirst();
                        cur.process();
                    }
                }
            });
            processThread.start();
        }
    }
}

重要的是,数据必须按照来自线程的顺序进行处理。

4

2 回答 2

1

You are seriously reinventing the wheel here. All you want is available in the JDK in the java.util.concurrent package.

Implement a producer-consumer pattern via a BlockingQueue, with your producers calling offer() and your consumer thread calling take(), which blocks until something's available.

That's it. You don't need, and you shouldn't be writing, all those classes you have written. These concurrent classes do all the locking and synchronization for you, and do it correctly too (which is not to be underestimated)

于 2012-11-04T11:24:53.027 回答
0

如果您不允许使用任何东西,java.util.concurrent那么您将不得不基于类似LinkedList. 我会将阻塞行为封装在队列中,例如(伪代码)

synchronized Data nextTask() {
  while(the linked list is empty) {
    wait()
  }
  remove and return head of the queue
}

synchronized void addTask(Data d) {
  add d to the queue
  notifyAll()
}

然后你可以有一个消费者线程不断地做这样的事情

while(true) {
  taskQueue.nextTask().process()
}

并且生产者线程调用taskQueue.addTask以将每个任务添加到队列中。如果您需要在最后正常关闭,那么您要么需要一些“哨兵值”来告诉消费者线程完成,要么找到某种Thread.interrupt()在正确时间调用的方法。

于 2012-11-04T21:06:40.900 回答