8

我有一个编写器线程和一个读取器线程来更新和处理数组池(存储在映射中的引用)。写入与读取的比率几乎为 5:1(写入延迟是一个问题)。

编写器线程需要根据某些事件更新池中数组的少数元素。整个写操作(所有元素)需要是原子的。

如果写入器线程正在更新它,我想确保读取器线程读取先前更新的数组(类似于 volatile 但在整个数组而不是单个字段上)。基本上,我可以负担得起读取过时的值,但不会阻塞。

此外,由于写入如此频繁,因此在读/写时创建新对象或锁定整个数组会非常昂贵。

是否有更有效的数据结构可以使用或使用更便宜的锁?

4

8 回答 8

2

这个想法怎么样:编写器线程不会改变数组。它只是将更新排队。

读取器线程在进入需要阵列稳定快照的读取会话时,将排队更新应用到阵列,然后读取阵列。

class Update
{
    int position;
    Object value;
}

ArrayBlockingQueue<Update> updates = new ArrayBlockingQueue<>(Integer.MAX_VALUE);

void write()
{
    updates.put(new Update(...));
}

Object[] read()
{
    Update update;
    while((update=updates.poll())!=null)
        array[update.position] = update.value;

    return array;
}
于 2013-03-16T00:18:11.877 回答
2

另一个想法,假设数组只包含 20 个双精度数。

有两个数组,一个用于写入,一个用于读取。

读取器在读取期间锁定读取数组。

read()
    lock();
    read stuff
    unlock();

Writer先修改写数组,然后tryLock读数组,如果锁定失败,罚款,write()返回;如果锁定成功,则将写入数组复制到读取数组,然后释放锁定。

write()
    update write array
    if tryLock()
        copy write array to read array
        unlock()

可以阻止阅读器,但仅限于复制 20 个双打所需的时间,这很短。

读者应该使用自旋锁,do{}while(tryLock()==false);以免被挂起。

于 2013-03-16T01:05:06.723 回答
2

有没有更高效的数据结构?

是的,一点没错!它们被称为持久数据结构。它们能够仅通过存储与先前版本的差异来表示矢量/地图/等的新版本。所有版本都是不可变的,这使得它们适合并发(作者不会干扰/阻止读者,反之亦然)。

为了表达变化,人们将对持久数据结构的引用存储在引用类型中,例如AtomicReference,并更改这些引用指向的内容——而不是结构本身

Clojure提供了一流的持久数据结构实现。它们是用纯粹、高效的 Java 编写的。

以下程序公开了如何使用持久数据结构来解决您描述的问题。

import clojure.lang.IPersistentVector;
import clojure.lang.PersistentVector;

public class AtomicArrayUpdates {

    public static Map<Integer, AtomicReference<IPersistentVector>> pool
        = new HashMap<>();
    public static Random rnd = new Random();
    public static final int SIZE = 60000;
    // For simulating the reads/writes ratio
    public static final int SLEEP_TIMÉ = 5;

    static {        
        for (int i = 0; i < SIZE; i++) {
            pool.put(i, new AtomicReference(PersistentVector.EMPTY));
        }
    }

    public static class Writer implements Runnable {   
        @Override public void run() {
            while (true) {
                try {
                    Thread.sleep(SLEEP_TIMÉ);
                } catch (InterruptedException e) {}

                int index = rnd.nextInt(SIZE);
                IPersistentVector vec = pool.get(index).get();

                // note how we repeatedly assign vec to a new value
                // cons() means "append a value".
                vec = vec.cons(rnd.nextInt(SIZE + 1)); 
                // assocN(): "update" at index 0
                vec = vec.assocN(0, 42); 
                // appended values are nonsense, just an example!
                vec = vec.cons(rnd.nextInt(SIZE + 1)); 

                pool.get(index).set(vec);

            }
        }
    }

    public static class Reader implements Runnable {
        @Override public void run() {
            while (true) {
                try {
                    Thread.sleep(SLEEP_TIMÉ * 5);
                } catch (InterruptedException e) {}

                IPersistentVector vec = pool.get(rnd.nextInt(SIZE)).get();
                // Now you can do whatever you want with vec.
                // nothing can mutate it, and reading it doesn't block writers!
            }
        } 
    }

    public static void main(String[] args) {
        new Thread(new Writer()).start();
        new Thread(new Reader()).start();
    }
}
于 2013-03-16T01:52:49.650 回答
1

我会这样做:

  • 同步整个事情,看看性能是否足够好。考虑到你只有一个写线程和一个读线程,争用会很低,这可以很好地工作

    private final Map<Key, double[]> map = new HashMap<> ();
    
    public synchronized void write(Key key, double value, int index) {
        double[] array = map.get(key);
        array[index] = value;
    }
    
    public synchronized double[] read(Key key) {
        return map.get(key);
    }
    
  • 如果太慢,我会让作者复制数组,更改一些值并将新数组放回地图。请注意,数组副本非常快- 通常,一个 20 项数组很可能需要不到 100 纳秒

    //If all the keys and arrays are constructed before the writer/reader threads 
    //start, no need for a ConcurrentMap - otherwise use a ConcurrentMap
    private final Map<Key, AtomicReference<double[]>> map = new HashMap<> ();
    
    public void write(Key key, double value, int index) {
        AtomicReference<double[]> ref = map.get(key);
        double[] oldArray = ref.get();
        double[] newArray = oldArray.clone();
        newArray[index] = value;
        //you might want to check the return value to see if it worked
        //or you might just skip the update if another writes was performed
        //in the meantime
        ref.compareAndSet(oldArray, newArray);
    }
    
    public double[] read(Key key) {
        return map.get(key).get(); //check for null
    }
    

由于写入如此频繁,因此在读/写时创建新对象或锁定整个数组将非常昂贵。

多频繁?除非每毫秒有数百个,否则应该没问题。

另请注意:

  • 在 Java 中创建对象相当便宜(想想大约 10 个 CPU 周期 = 几纳秒)
  • 短期对象的垃圾回收通常是免费的(只要对象留在年轻代,如果它不可达,GC 就不会访问它)
  • 而长寿命对象对 GC 性能有影响,因为它们需要被复制到老年代
于 2013-03-16T11:56:57.087 回答
0
  • 您需要两个静态引用:readArraywriteArray一个简单的互斥锁来跟踪写入何时更改。

  • 有一个名为 changeWriteArray 的锁定函数对 writeArray 的 deepCopy 进行更改:

    synchronized String[] changeWriteArray(String[] writeArrayCopy, other params go here){ // 这里对 writeArray 的 deepCopy 进行修改

          //then return deepCopy
          return writeArrayCopy;
    }
    
  • 请注意,changeWriteArray它实际上是没有副作用的函数式编程,因为它返回的副本既不是readArray也不是writeArray

  • 打电话的人changeWriteArray必须称其为writeArray = changeWriteArray(writeArray.deepCopy()).

  • 互斥体由两者更改changeWriteArrayupdateReadArray但仅由updateReadArray. 如果设置了互斥锁,则将简单地指向实际块的updateReadArray引用readArraywriteArray

编辑:

@vemv 关于您提到的答案。虽然想法是相同的,但区别很重要:两个静态引用是static这样的,因此无需花费时间将更改实际复制到readArray; 而是将指针readArray移动到指向writeArray。实际上,我们通过changeWriteArray根据需要生成的 tmp 数组进行交换。此外,这里的锁定是最小的,因为阅读不需要锁定,因为您可以在任何给定时间拥有多个阅读器。

事实上,使用这种方法,您可以保留并发读者的计数,并检查计数器是否为零,以了解何时readArray更新writeArray; 再次,进一步说明阅读根本不需要锁定。

于 2013-03-16T05:58:41.813 回答
0

改进@zhong.j.yu 的答案,将写入排队而不是在写入发生时尝试执行它们确实是一个好主意。然而,当更新来得如此之快以至于读者会因不断地更新而窒息时,我们必须解决这个问题。我的想法是,如果读取只执行在读取之前排队的写入,而忽略后续写入(那些将是下一次阅读解决)。

您将需要编写自己的同步队列。它将基于链表,并且仅包含两种方法:

public synchronised enqeue(Write write);

此方法将以原子方式将写入排入队列。当写入速度比将它们排队所需的速度更快时,可能会出现死锁,但我认为每秒必须有数十万次写入才能实现这一目标。

public synchronised Element cut();

这将自动清空队列并将其头部(或尾部)作为 Element 对象返回。它将包含一系列其他元素(Element.next 等...,只是通常的链表内容),所有这些元素都代表自上次读取以来的写入链。然后队列将是空的,准备好接受新的写入。然后读者可以跟踪元素链(到那时它将是独立的,不受后续写入的影响),执行写入,最后执行读取。当阅读器处理读取时,新的写入将排入队列,但这将是下一次读取的问题。

我写过一次,虽然是用 C++ 来表示一个声音数据缓冲区。写入(驱动程序发送更多数据)多于读取(数据上的一些数学内容),而写入必须尽快完成。(数据是实时来的,所以我需要在驱动程序中准备好下一批之前保存它们。)

于 2013-03-17T23:50:45.493 回答
0

以下变体的灵感来自我之前的回答zhong.j.yu 的回答。

编写者不会干扰/阻止读者,反之亦然,并且不存在线程安全/可见性问题,或者正在进行微妙的推理。

public class V2 {

    static Map<Integer, AtomicReference<Double[]>> commited = new HashMap<>();
    static Random rnd = new Random();

    static class Writer {
        private Map<Integer, Double[]> writeable = new HashMap<>();
        void write() {        
            int i = rnd.nextInt(writeable.size());   
            // manipulate writeable.get(i)...
            commited.get(i).set(writeable.get(i).clone());
        }
    }

    static class Reader{
        void read() {
            double[] arr = commited.get(rnd.nextInt(commited.size())).get();
            // do something useful with arr...
        } 
    }

}
于 2013-03-16T03:10:19.800 回答
0

我有一个有趣的解决方案,使用三个数组和一个 volatile 布尔切换。基本上,两个线程都有自己的数组。此外,还有一个通过切换控制的共享阵列。

当编写器完成并且切换允许它时,它将新写入的数组复制到共享数组中并翻转切换。

同样,在阅读器启动之前,当切换允许时,它会将共享数组复制到自己的数组中并翻转切换。

public class MolecularArray {
    private final double[] writeArray;
    private final double[] sharedArray;
    private final double[] readArray;

    private volatile boolean writerOwnsShared;

    MolecularArray(int length) {
        writeArray = new double[length];
        sharedArray = new double[length];
        readArray = new double[length];
    }

    void read(Consumer<double[]> reader) {
        if (!writerOwnsShared) {
            copyFromTo(sharedArray, readArray);
            writerOwnsShared = true;
        }
        reader.accept(readArray);
    }

    void write(Consumer<double[]> writer) {
        writer.accept(writeArray);
        if (writerOwnsShared) {
            copyFromTo(writeArray, sharedArray);
            writerOwnsShared = false;
        }
    }

    private void copyFromTo(double[] from, double[] to) {
        System.arraycopy(from, 0, to, 0, from.length);
    }
}
  • 这取决于“单写线程和单读”假设。
  • 它从不阻塞。
  • 它使用恒定(尽管巨大)的内存量。
  • read没有任何干预的重复调用write不会复制,反之亦然。
  • 读者不一定会看到最新的数据,但它会看到从前一个write开始的数据read,如果有的话。

我想,这可以使用两个共享数组来改进。

于 2018-09-12T22:14:27.163 回答