1

以下是我当前代码的简化版本。我很确定我在语法方面没有做任何错误的事情,而且我找不到我的概念错误。

这是我试图实现的一种观察者模式。我负担不起从 Java.utils.observable 继承,因为我的类已经很复杂并且从另一个类继承。这里有两个部分:

有一个 Notifier 类实现 Runnable :

public class Notifier implements Runnable{

    public void run()
    {
        while(true)
        {
            MyDataType data = getData();
            if(data.isChanged()==true)
            {
                refresh();
            }
        }
    }
}

然后是我的主类,它需要响应对 MyDataType 数据的更改。

public class abc {

    private MyDataType data;

    public void abc(){
               Notifier notifier = new Notifier();
               Thread thread = new Thread(notifier);
               thread.start();
          }     


    public MyDataType getData(){
              return this.data;
    }


    public void refresh(){
         MyDatatype data = getData();
     //Do something with data
    }
}

问题:发生的事情是通知程序在“数据”更改时调用 refresh()。但是在 refresh() 中,当我执行 getData() 时,我得到的是旧版本的“数据”!我应该提一下,代码的其他部分也调用了 refresh() 函数。

  • 我在看什么?
  • 这个问题还有其他更好的解决方案吗?
  • 如果我不能应用开箱即用的默认 Java 实现,我应该如何设计 Subject-Observer 系统?
4

2 回答 2

4

当我执行 getData() 时,我得到的是旧版本的“数据”!

您的data字段在多个线程之间共享,因此必须用volatile关键字标记。

private volatile MyDataType data;

这会导致读取和写入周围的“内存屏障”使值对所有线程可见。即使通知线程正在调用getData(),如果它的内存缓存,它的值data也会被检索出来。如果没有内存屏障,data值将随机更新或从不更新。

正如评论中提到的@JB,volatile可以保护您免受重新分配该data字段的影响。如果您更新当前 data值中的某个字段,则不会越过内存屏障,即不会更新通知程序的内存。

回顾你的代码,看起来是这样的:

if(data.isChanged()==true)
{
    refresh();
}

如果data没有被分配给一个新对象,那么制作datavolatile无济于事。你不得不:

  • volatile boolean dirty;每当data更新时设置某种字段。
  • 每次都data在一个块内更新或读取。synchronize
于 2012-06-11T18:55:00.540 回答
1

首先,您的data变量可能会被缓存,因此您总是需要通过 make 来获取最新值volatile

其次,您在这里所做的是生产者/消费者模式。这种模式通常最好用消息来实现。当您收到新数据时,您可以创建一个不可变对象并将其发布到消费者线程(通过像BlockingQueue这样的线程安全队列),而不是使用共享变量。

这些方面的东西:

public class Notifier extends Thread{
   private BlockingQueue<E> consumerQueue = null;
   public setConsumerQueue(BlockingQueue<E> val){
      consumerQueue = val;
   }
   // main method where data is received from socket...
   public void run(){
      while(!interrupted()){
           data = ... // got new data here
           if(!data.isChanged()) continue;
           // Post new data only when it has changed
           if(consumerQueue!=null) consumerQueue.offer(data);
      }
   }
}

public class Consumer extends Thread{
   private BlockingQueue<E> consumerQueue = new BlockingQueue<E>();
   public Consumer (Producer val){
      val.setConsumerQueue(consumerQueue);
   }
   public void run(){
      while(!interrupted()){
           data = consumerQueue.take();// block until there is data from producer
           if(data !=null) processData(data);
      }
   }
}
于 2012-06-11T19:14:49.683 回答