4

我正在基于Java的ConcurrentSkipListMap实现一个并发跳过列表映射,不同之处在于我希望列表允许重复,并且我还希望列表是可索引的(以便查找列表的第 N 个元素需要 O(lg( n)) 时间,而不是标准跳过列表的 O(n) 时间)。这些修改不存在问题。

此外,跳过列表的键是可变的。例如,如果列表元素是整数 {0, 4, 7},那么中间元素的 key 可以更改为 [0, 7] 中的任何值,而不会提示更改列表结构;如果键更改为 (-inf, -1] 或 [8, +inf),则删除并重新添加元素以保持列表顺序。我没有将其实现为移除后跟 O(lg(n)) 插入,而是将其实现为移除后跟线性遍历后跟 O(1) 插入(预期运行时间为 O(1) - 99节点将与相邻节点交换的时间百分比)。

很少会插入一个全新的节点(在启动后),并且永远不会删除一个节点(没有立即重新添加它);几乎所有的操作都是 elementAt(i) 来检索第 i 个索引处的元素,或者是在修改键后交换节点的操作。

我遇到的问题是如何实现密钥修改类。从概念上讲,我想做类似的事情

public class Node implements Runnable {
    private int key;
    private Node prev, next;
    private BlockingQueue<Integer> queue;

    public void update(int i) {
        queue.offer(i);
    }

    public void run() {
        while(true) {
            int temp = queue.take();
            temp += key;
            if(prev.getKey() > temp) {
                // remove node, update key to temp, perform backward linear traversal, and insert
            } else if(next.getKey() < temp) {
                // remove node, update key to temp, perform forward linear traveral, and insert
            } else {
                key = temp; // node doesn't change position
            }
        }
    }
}

insert被调用的子方法run使用 CAS 来处理两个节点尝试同时在同一位置插入的问题(类似于ConcurrentSkipListMap处理冲突插入的方式) - 从概念上讲,这与第一个节点锁定节点相同与插入点相邻,但在没有冲突的情况下会减少开销。)

这样我可以确保列表总是有序的(如果密钥更新有点延迟也没关系,因为我可以确定更新最终会发生;但是,如果列表变得无序,那么事情可能会变得混乱)。问题在于以这种方式实现列表会生成大量线程,每个线程Node(列表中有数千个节点) - 其中大多数会在任何给定时间点阻塞,但我担心有数千个阻塞线程仍然会导致过高的开销。

另一种选择是使update方法同步并Runnable从中删除接口Node,以便两个线程将更新排队Node,然后在其单独的线程上处理这些更新,这两个线程将轮流执行该Node#update方法。问题是这可能会造成瓶颈。如果八个不同的线程都决定一次更新同一个节点,那么队列实现可以很好地扩展,但是同步实现会阻塞八个线程中的七个(然后会阻塞六个线程,然后是五个,等等)。

所以我的问题是,我将如何实现类似于队列实现的东西,除非减少线程数量,否则我将如何实现类似同步实现的东西,除非没有潜在的瓶颈问题。

4

1 回答 1

0

我想我可以用ThreadPoolExecutor类似的东西来解决这个问题

public class Node {
    private int key;
    private Node prev, next;
    private ConcurrentLinkedQueue<Integer> queue;
    private AtomicBoolean lock = new AtomicBoolean(false);
    private ThreadPoolExecutor executor;
    private UpdateNode updater = new UpdateNode();

    public void update(int i) {
        queue.offer(i);
        if(lock.compareAndSet(false, true)) {
            executor.execute(updater);
        }
    }

    private class UpdateNode implements Runnable {
        public void run() {
            do {
                try {
                    int temp = key;
                    while(!queue.isEmpty()) {
                        temp += queue.poll();
                    }
                    if(prev.getKey() > temp) {
                        // remove node, update key to temp, perform backward linear traversal, and insert
                    } else if(next.getKey() < temp) {
                        // remove node, update key to temp, perform forward linear traveral, and insert
                    } else {
                        key = temp; // node doesn't change position
                    }
                } finally {
                    lock.set(false);
                }
            } while (!queue.isEmpty() && lock.compareAndSet(false, true));
        }
    }
}

这样我就拥有了队列方法的优势,而没有一千个线程被阻塞 - 我UpdateNode每次需要更新节点时都执行 a (除非已经有一个UpdateNode正在执行的节点Node,因此AtomicBoolean它充当锁),并且依靠ThreadPoolExecutor它来降低创建数千个可运行文件的成本。

于 2013-05-14T13:35:08.580 回答