0

我开发了一个队列,它允许单个消费者和生产者同时提供/轮询队列中的元素,而无需对每个提供/轮询进行同步或 CAS 操作。相反,当队列的尾部为空时,只需要一个原子操作。该队列旨在减少队列被缓冲并且消费者没有赶上生产者的情况下的延迟。

在这个问题中,我想审查实现(代码还没有被其他人审查过,所以获得第二意见会很好)并讨论一种我认为应该显着减少延迟的使用模式,以及这种架构是否可以可能比 LMAX 干扰器运行得更快。

代码在 github 上: https ://github.com/aranhakki/experimental-performance/blob/master/java/src/concurrency/messaging/ConcurrentPollOfferArrayQueue.java

/*
 * Copyright 2014 Aran Hakki
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package concurrency.messaging;

// A non-blocking queue which allows concurrent offer and poll operations with minimal contention.
// Contention in offer and poll operations only occurs when offerArray, which acts as an incomming message buffer,
// becomes full, and we must wait for it too be swapped with the pollArray, acting as a outgoing message buffer,
// the most simple analogy would be too imaging two buckets, one we fill and at the same time we empty another bucket
// which already contains some liquid, then at the point the initial bucket becomes full, we swap it with the bucket that
// is being emptied. 

// It's possible that this mechanism might be faster than the LMAX disruptor, need to create tests to confirm.

public final class ConcurrentPollOfferArrayQueue<T> {

    private T[] offerArray;
    private T[] pollArray;

    public ConcurrentPollOfferArrayQueue(T[] _pollArray){
        offerArray = (T[]) new Object[_pollArray.length];
        pollArray = _pollArray;
    }

    private int offerIndex = 0;
    private int pollIndex = 0;

    public void offer(T t){
        if (offerIndex<offerArray.length){
            offerArray[offerIndex] = t;
            offerIndex++;
        } else {
            while(!arrayFlipped){

            }
            arrayFlipped = false;
            offerIndex = 0;
            offer(t);
        }
    }

    private volatile boolean arrayFlipped = false;

    public T poll(){
        if (pollIndex<pollArray.length){
            T t = pollArray[pollIndex];
            pollArray[pollIndex] = null;
            pollIndex++;
            return t;
        } else {
            pollIndex = 0;
            T[] pollArrayTmp = pollArray;
            pollArray = offerArray;
            offerArray = pollArrayTmp;
            arrayFlipped = true;
            return poll();
        }

    }

}

通过使用这些队列中的许多来代替都引用同一个队列的多个生产者和消费者,我认为延迟可以显着减少。

考虑生产者 A、B、C 都引用单个队列 Q,而消费者 E、E 和 F 都引用同一个队列。这导致了以下一组关系,因此产生了很多争用:

A 写给 Q

B 写给 Q

C 写到 Q

E 写给 Q

D 写给 Q

F 写入 Q

使用我开发的队列,可以在每个生产者和单个消费者聚合线程之间建立一个队列,该线程将获取每个生产者队列尾部的元素并将它们放在消费者队列的头部。这将显着减少争用,因为我们只有一个写入器来写入一段内存。关系船现在看起来如下:

A writeTo headOf(AQ)

B writeTo headOf(BQ)

C writesTo headOf(CQ)

ConsumerAggregationThread writesTo tailOf(AQ)

ConsumerAggregationThread writesTo tailOf(BQ)

ConsumerAggregationThread writesTo tailOf(CQ)

ConsumerAggregationThread writesTo headOf(EQ)

ConsumerAggregationThread writesTo headOf(FQ)

ConsumerAggregationThread writesTo headOf(GQ)

E writeTo tailOf(EQ)

F writeTo tailOf(FQ)

G writeTo tailOf(GQ)

上述关系确保了单写原则。

我很想听听你的想法。

4

1 回答 1

0

大家觉得这个实现怎么样?我已经改变了它,以便轮询线程在 pollQueue 为空时触发队列切换。

/*
* Copyright 2014 Aran Hakki
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/



/* 
* A non-blocking queue which allows concurrent offer and poll operations with    minimal contention.
* Contention in offer and poll operations only occurs when pollQueue is empty and must be swapped with offer queue.
* This implementation does not make use of any low level Java memory optimizations e.g. using the Unsafe class or direct byte buffers,
* so its possible it could run much faster.
* If re-engineered to use lower level features its possible that this approach might be faster than the LMAX disruptor.
* I'm current observing an average latency of approx 6000ns.
*/

package concurrency.messaging;

import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;

public class ConcurrentPollOfferQueue<T> {

private class ThreadSafeSizeQueue<T> {
    private Queue<T> queue = new LinkedList<T>();
    private volatile AtomicInteger size = new AtomicInteger(0);

    public int size(){
        return size.get();
    }

    public void offer(T value){
        queue.offer(value);
        size.incrementAndGet();
    }

    public T poll(){
        T value = queue.poll();
        if (value!=null){
            size.decrementAndGet();
        }
        return value;
    }
}

private volatile ThreadSafeSizeQueue<T> offerQueue;
private volatile ThreadSafeSizeQueue<T> pollQueue;

private int capacity;

public ConcurrentPollOfferQueue(int capacity){
    this.capacity = capacity;
    offerQueue = new ThreadSafeSizeQueue<T>();
    pollQueue = new ThreadSafeSizeQueue<T>();
}

public void offer(T value){
    while(offerQueue.size()==capacity){/* wait for consumer to finishing consuming pollQueue */}
    offerQueue.offer(value);
}

public T poll(){
    T polled;
    while((polled = pollQueue.poll())==null){
        if (pollQueue.size()==0){
            ThreadSafeSizeQueue<T> tmpQueue = offerQueue;
            offerQueue = pollQueue;
            pollQueue = tmpQueue;
        } 
    }
    return polled;
}

public boolean isEmpty(){
    return pollQueue.size()==0;
}
于 2015-03-12T01:08:54.620 回答