2

我正在用java编程,我有一个List<LogEntry> log在不同线程之间共享的。

这些“编写者”线程已经在它们之间同步,因此每次只有一个线程可以添加或删除元素log

然而,由于我试图实现的分布式算法,日志的一部分是“安全的”,这意味着它们既不能被作者也不能被读者修改(我将在下面介绍)。这部分log由字段 表示int committedIndex,初始化为 0 并单调递增。

总而言之,writer 修改logrange 中的元素(commitIndex,log.size()),而 reader 获取logrange 中包含的元素[0,commitIndex]。阅读器从第一个条目开始阅读,然后阅读下一个条目,直到达到log.get(commitIndex),然后它停止并进入睡眠状态,直到commitIndex增加。它更新一个lastApplied初始化为 0 并单调增加的字段,以记住logEntry他在睡觉前阅读的最后一个字段。

如您所见,无需同步读取器和写入器,因为它们访问log.

我的问题是:当增加时,我怎样才能“唤醒”读者的线程commitIndex?我需要这样的东西(由作家执行):

if(commitIndex is updated)
{
     //wake up reader
}

和读者:

public void run() {
    while(true){
        //go to sleeep...
        //now the reader is awaken!
        while(lastApplied<commitIndex){
            //do something with log.get(lastApplied)
            lastApplied++;
        }
    }

显然,我非常简化了我的代码,以便让您尽可能地了解我想要什么,如果不够清楚,我很抱歉(请随时向我询问)。谢谢!

4

2 回答 2

0

试试这个:

if(commitIndex is updated)
{
  //wake up reader
  synchronized(reader)
  {
    reader.notify();
  }
}
于 2015-03-04T12:55:25.000 回答
0

使用共享LinkedBlockingQueue<Integer>(在读者和所有作者之间)让每个作者向读者发出commitIndex变量已被修改的信号:

作家:

if (commitIndex is updated) {
    // wake up reader
    this.queue.add(commitIndex);
}

读者:

public void run() {
    while (true) {

        // take() puts this thread to sleep until a writer calls add()
        int commitIndex = this.queue.take();

        // now the reader is awaken!
        while (lastApplied < commitIndex) {
            // do something with log.get(lastApplied)
            lastApplied++;
        }
    }
}

在这里,我为读者和所有作者使用了属性queue,它应该对应于 的同一个实例。LinkedBlockingQueue

注意:异常处理留作练习。

于 2015-03-04T14:35:11.787 回答