3

我有一个实现 Observer & Runnable 的类,如下所示(我知道这个例子可能很笨拙):

public class Triage implements Observer,Runnable{
    Observable obsrvbl;
    private BlockingQueue<String> messages; 
    volatile static boolean interrupted=false;  
    double updated;

    Triage(Observable obsrvbl, BlockingQueue messages){
    this.obsrvbl=obsrvbl;
    this.messages = messages;
    obsrvbl.addObserver(this);
    }

    public void update(Observable o, Object arg){
        updated += ((Double)arg).doubleValue();
        System.out.println("updated");
    }

    public void run(){
        String msg; 
        while(!interrupted){
            msg=messages.take();
            if(msg!=null){    
                //do something with message         
            }
        }
    }
}

正在查看的队列在 Observable 调用 notifyObservers() 的同时填充。当 Queue 上没有任何内容时,在 Observer 上成功调用 update(),但如果 Queue 上有要处理的消息,则永远不会调用 update()。这是预期的行为吗?

我见过这个,但它似乎是一个不同的问题。

这是 Observable - 有点做作:

public class Producer extends Observable implements Runnable {
    volatile static boolean interrupted=false;
    private BlockingQueue<String> quotes;
    Producer(BlockingQueue quotes){
        this.quotes=quotes; 
    }
    public void run(){
        String msg;     
        while(!interrupted){    
            msg=quotes.take();
            if(msg!=null){
                setChanged();
                notifyObservers(Double.valueOf(3.0));   
            }
        }
    }
}
4

1 回答 1

1

弄清楚这有什么问题-我的疏忽。正如@slim 在评论中建议的那样,两个队列是相同的,因此 Triage 的 run() 在 Producer 的 run() 之前消耗了消息,因此没有收到通知。无论如何,fwiw - 这是一个完整的工作示例:

public class Triage implements Observer,Runnable{
    Observable obsrvbl;
    private BlockingQueue<String> messages; 
    volatile static boolean interrupted=false;  
    Integer updated = 0;
    private static Random rand = new Random(47);

    Triage(Observable obsrvbl, BlockingQueue messages){
        this.obsrvbl=obsrvbl;
        this.messages = messages;
        obsrvbl.addObserver(this);
    }

    public void update(Observable o, Object arg){
        updated += ((Integer)arg);
        System.out.println("Updated: " + updated);
    }

    public void run(){
        String msg; 
        while(!interrupted){
            try{
                msg=messages.take();
                System.out.println("Run: " + msg);
            }catch(InterruptedException ie){}
        }
    }

    public static void main(String[] args){
        BlockingQueue<String> q1 = new LinkedBlockingQueue<String>();
        BlockingQueue<String> q2 = new LinkedBlockingQueue<String>();
        Producer p = new Producer(q1);
        new Thread(p).start();
        new Thread(new Triage(p,q2)).start();
        for(int i=0;i<20;i++){      
            int next = rand.nextInt(10)*500;
            System.out.println("Populating: " + next);
            q1.add((Integer.valueOf(next)).toString());
            q2.add((Integer.valueOf(next)).toString());         
        }
    }
}

class Producer extends Observable implements Runnable {
    volatile static boolean interrupted=false;
    private BlockingQueue<String> quotes;
    Producer(BlockingQueue quotes){
        this.quotes=quotes; 
    }
    public void run(){
        String msg;     
        while(!interrupted){    
            try{
                msg=quotes.take();
                if(msg!=null){
                    System.out.println("Notifying: " + msg);
                    setChanged();
                    notifyObservers(Integer.valueOf(msg));  
                }
            }catch(InterruptedException ie){}
        }
    }
}
于 2012-09-27T10:56:17.110 回答