44

我想要一个实现固定大小的循环缓冲区的简单类。它应该是高效的、易于使用的、通用的类型。

目前它不需要支持 MT。以后总能加个锁,反正不会是高并发的。

方法应该是:.Add()我猜.List(),我在哪里检索所有条目。再三考虑,我认为应该通过索引器进行检索。在任何时候,我都希望能够通过index检索缓冲区中的任何元素。但请记住,从一个时刻到下一个 Element[n] 可能会有所不同,因为循环缓冲区会填满并翻转。这不是一个堆栈,它是一个循环缓冲区。

关于“溢出”:我希望内部会有一个包含项目的数组,并且随着时间的推移,缓冲区的头部尾部将围绕该固定数组旋转。但这对用户来说应该是不可见的。不应有外部可检测到的“溢出”事件或行为。

这不是学校作业——它最常用于MRU 缓存或固定大小的事务或事件日志。

4

13 回答 13

22

我会使用一个 T 数组,一个头尾指针,以及 add 和 get 方法。

喜欢:(Bug 搜索留给用户)

// Hijack these for simplicity
import java.nio.BufferOverflowException;
import java.nio.BufferUnderflowException;

public class CircularBuffer<T> {

  private T[] buffer;

  private int tail;

  private int head;

  @SuppressWarnings("unchecked")
  public CircularBuffer(int n) {
    buffer = (T[]) new Object[n];
    tail = 0;
    head = 0;
  }

  public void add(T toAdd) {
    if (head != (tail - 1)) {
        buffer[head++] = toAdd;
    } else {
        throw new BufferOverflowException();
    }
    head = head % buffer.length;
  }

  public T get() {
    T t = null;
    int adjTail = tail > head ? tail - buffer.length : tail;
    if (adjTail < head) {
        t = (T) buffer[tail++];
        tail = tail % buffer.length;
    } else {
        throw new BufferUnderflowException();
    }
    return t;
  }

  public String toString() {
    return "CircularBuffer(size=" + buffer.length + ", head=" + head + ", tail=" + tail + ")";
  }

  public static void main(String[] args) {
    CircularBuffer<String> b = new CircularBuffer<String>(3);
    for (int i = 0; i < 10; i++) {
        System.out.println("Start: " + b);
        b.add("One");
        System.out.println("One: " + b);
        b.add("Two");
        System.out.println("Two: " + b);
        System.out.println("Got '" + b.get() + "', now " + b);

        b.add("Three");
        System.out.println("Three: " + b);
        // Test Overflow
        // b.add("Four");
        // System.out.println("Four: " + b);

        System.out.println("Got '" + b.get() + "', now " + b);
        System.out.println("Got '" + b.get() + "', now " + b);
        // Test Underflow
        // System.out.println("Got '" + b.get() + "', now " + b);

        // Back to start, let's shift on one
        b.add("Foo");
        b.get();
    }
  }
}
于 2009-02-26T11:33:19.140 回答
8

这就是我将(或确实)在 Java 中编写高效循环缓冲区的方式。它由一个简单的数组支持。对于我的特定用例,我需要高并发吞吐量,因此我使用 CAS 来分配索引。然后,我创建了可靠副本的机制,包括整个缓冲区的 CAS 副本。我在一个简短的文章中更详细地概述的案例中使用了它。

import java.util.concurrent.atomic.AtomicLong;
import java.lang.reflect.Array;

/**
 * A circular array buffer with a copy-and-swap cursor.
 *
 * <p>This class provides an list of T objects who's size is <em>unstable</em>.
 * It's intended for capturing data where the frequency of sampling greatly
 * outweighs the frequency of inspection (for instance, monitoring).</p>
 *
 * <p>This object keeps in memory a fixed size buffer which is used for
 * capturing objects.  It copies the objects to a snapshot array which may be
 * worked with.  The size of the snapshot array will vary based on the
 * stability of the array during the copy operation.</p>
 *
 * <p>Adding buffer to the buffer is <em>O(1)</em>, and lockless.  Taking a
 * stable copy of the sample is <em>O(n)</em>.</p>
 */
public class ConcurrentCircularBuffer <T> {
    private final AtomicLong cursor = new AtomicLong();
    private final T[]      buffer;
    private final Class<T> type;

    /**
     * Create a new concurrent circular buffer.
     *
     * @param type The type of the array.  This is captured for the same reason
     * it's required by {@link java.util.List.toArray()}.
     *
     * @param bufferSize The size of the buffer.
     *
     * @throws IllegalArgumentException if the bufferSize is a non-positive
     * value.
     */
    public ConcurrentCircularBuffer (final Class <T> type, 
                                     final int bufferSize) 
    {
        if (bufferSize < 1) {
            throw new IllegalArgumentException(
                "Buffer size must be a positive value"
                );
        }

        this.type    = type;
        this.buffer = (T[]) new Object [ bufferSize ];
    }

    /**
     * Add a new object to this buffer.
     *
     * <p>Add a new object to the cursor-point of the buffer.</p>
     *
     * @param sample The object to add.
     */
    public void add (T sample) {
        buffer[(int) (cursor.getAndIncrement() % buffer.length)] = sample;
    }

    /**
     * Return a stable snapshot of the buffer.
     *
     * <p>Capture a stable snapshot of the buffer as an array.  The snapshot
     * may not be the same length as the buffer, any objects which were
     * unstable during the copy will be factored out.</p>
     * 
     * @return An array snapshot of the buffer.
     */
    public T[] snapshot () {
        T[] snapshots = (T[]) new Object [ buffer.length ];

        /* Determine the size of the snapshot by the number of affected
         * records.  Trim the size of the snapshot by the number of records
         * which are considered to be unstable during the copy (the amount the
         * cursor may have moved while the copy took place).
         *
         * If the cursor eliminated the sample (if the sample size is so small
         * compared to the rate of mutation that it did a full-wrap during the
         * copy) then just treat the buffer as though the cursor is
         * buffer.length - 1 and it was not changed during copy (this is
         * unlikley, but it should typically provide fairly stable results).
         */
        long before = cursor.get();

        /* If the cursor hasn't yet moved, skip the copying and simply return a
         * zero-length array.
         */
        if (before == 0) {
            return (T[]) Array.newInstance(type, 0);
        }

        System.arraycopy(buffer, 0, snapshots, 0, buffer.length);

        long after          = cursor.get();
        int  size           = buffer.length - (int) (after - before);
        long snapshotCursor = before - 1;

        /* Highly unlikely, but the entire buffer was replaced while we
         * waited...so just return a zero length array, since we can't get a
         * stable snapshot...
         */
        if (size <= 0) {
            return (T[]) Array.newInstance(type, 0);
        }

        long start = snapshotCursor - (size - 1);
        long end   = snapshotCursor;

        if (snapshotCursor < snapshots.length) {
            size   = (int) snapshotCursor + 1;
            start  = 0;
        }

        /* Copy the sample snapshot to a new array the size of our stable
         * snapshot area.
         */
        T[] result = (T[]) Array.newInstance(type, size);

        int startOfCopy = (int) (start % snapshots.length);
        int endOfCopy   = (int) (end   % snapshots.length);

        /* If the buffer space wraps the physical end of the array, use two
         * copies to construct the new array.
         */
        if (startOfCopy > endOfCopy) {
            System.arraycopy(snapshots, startOfCopy,
                             result, 0, 
                             snapshots.length - startOfCopy);
            System.arraycopy(snapshots, 0,
                             result, (snapshots.length - startOfCopy),
                             endOfCopy + 1);
        }
        else {
            /* Otherwise it's a single continuous segment, copy the whole thing
             * into the result.
             */
            System.arraycopy(snapshots, startOfCopy,
                             result, 0, endOfCopy - startOfCopy + 1);
        }

        return (T[]) result;
    }

    /**
     * Get a stable snapshot of the complete buffer.
     *
     * <p>This operation fetches a snapshot of the buffer using the algorithm
     * defined in {@link snapshot()}.  If there was concurrent modification of
     * the buffer during the copy, however, it will retry until a full stable
     * snapshot of the buffer was acquired.</p>
     *
     * <p><em>Note, for very busy buffers on large symmetric multiprocessing
     * machines and supercomputers running data processing intensive
     * applications, this operation has the potential of being fairly
     * expensive.  In practice on commodity hardware, dualcore processors and
     * non-processing intensive systems (such as web services) it very rarely
     * retries.</em></p>
     *
     * @return A full copy of the internal buffer.
     */
    public T[] completeSnapshot () {
        T[] snapshot = snapshot();

        /* Try again until we get a snapshot that's the same size as the
         * buffer...  This is very often a single iteration, but it depends on
         * how busy the system is.
         */
        while (snapshot.length != buffer.length) {
            snapshot = snapshot();
        }

        return snapshot;
    }

    /**
     * The size of this buffer.
     */
    public int size () {
        return buffer.length;
    }
}
于 2010-05-24T23:01:58.640 回答
4

我会根据需要使用ArrayBlockingQueue或其他预构建的队列实现之一。很少需要自己实现这样的数据结构(除非是学校作业)。

编辑:既然您已经添加了“通过索引检索缓冲区中的任何元素”的要求,我想您需要实现自己的类(除非google-collections或其他一些库提供了一个)。正如 JeeBee 的示例所示,循环缓冲区很容易实现。你也可以看看 ArrayBlockingQueue 的源代码——它的代码很干净,只是去掉了锁定和不需要的方法,并添加了通过索引访问它的方法。

于 2009-02-26T11:58:29.640 回答
4

这是我在生产代码中使用的 Java 即用型 CircularArrayList 实现。通过以 Java 推荐的方式覆盖 AbstractList,它支持 Java Collections Framework 中标准 List 实现所期望的所有功能(通用元素类型、子列表、迭代等)。

以下调用在 O(1) 中完成:

  • add(item) - 在列表末尾添加
  • remove(0) - 从列表的开头删除
  • get(i) - 检索列表中的随机元素
于 2011-07-03T21:42:19.897 回答
4

使用 Java 的ArrayDeque

于 2011-12-08T07:31:45.447 回答
2

只需使用别人的实现:

Power Collections Deque<T>由循环缓冲区实现。

电源集合库不完整,但 Deque 是完全可以接受的扩展循环缓冲区。

由于您表明您不希望扩展而是希望覆盖,您可以相当容易地修改代码以覆盖。这将简单地涉及删除对逻辑上相邻的指针的检查,并且无论如何都只是写。同时可以将私有缓冲区设为只读。

于 2009-02-26T19:52:58.610 回答
1

System.Collections.Generic.Queue - 内部是简单的循环缓冲区(带有头部和尾部的 T[],就像来自 JeeBee 的示例一样)。

于 2010-05-10T18:57:35.750 回答
1

Guava 15 中,我们引入了EvictingQueue,这是一个非阻塞的有界队列,当尝试将元素添加到完整队列时,它会自动从队列头部驱逐(删除)元素。这与传统的有界队列不同,后者在满时阻塞或拒绝新元素。

听起来这应该适合您的需求,并且具有比ArrayDeque直接使用更简单的界面(尽管它在引擎盖下使用了一个!)。

更多信息可以在这里找到。

于 2015-07-31T17:35:50.943 回答
1

我想从java的角度来回答这个问题。

要使用 java 实现循环缓冲区,您可能需要三样东西,包括:循环缓冲区类、泛型和对它的少量操作(为了了解您需要哪些操作以及这些操作中的内部机制,您可能需要阅读wiki首先是循环缓冲区)。

其次,缓冲区满或空的判断要非常小心。这里我给出两个本能的全/空判断解决方案。在解决方案一中,您需要创建两个整数变量来存储缓冲区的当前大小和缓冲区的最大大小。显然,如果当前大小等于最大大小,则缓冲区已满。

在另一种解决方案中,我们将最后一个存储位置设置为空闲(对于大小为 7 的循环缓冲区,我们将存储设置为空闲的 7)。据此,我们可以在(rp+1)%MAXSIZE == fp;满足表达式时确定缓冲区已满。

为了更清楚,这里给出了一种使用 java 语言的实现。

import java.nio.BufferOverflowException;
import java.nio.BufferUnderflowException;        

public class CircularBuffer<T> {
    private int front;
    private int rear;
    private int currentSize;
    private int maxSize;
    private T[] buffer;

    public CircularBuffer(int n) {
        buffer = (T[]) new Object[n];
        front = 0;
        rear = 0;
        currentSize = 0;
        maxSize = n;
    }

    public void push(T e) {
        if (!isFull()) {
            buffer[rear] = e;
            currentSize++;
            rear = (rear + 1) % maxSize;
        } else throw new BufferOverflowException();
    }

    public T pop() {
        if (!isEmpty()) {
            T temp = buffer[front];
            buffer[front] = null;
            front = (front + 1) % maxSize;
            currentSize--;
            return temp;
        } else throw new BufferUnderflowException();
    }

    public T peekFirst() {
        if (!isEmpty()) {
            return buffer[front];
        } else  return null;
    }

    public T peekLast() {
        if (!isEmpty()) {
            return buffer[rear - 1];
        } else return null;
    }

    public int size() {
        return currentSize;
    }

    public boolean isEmpty() {
        if (currentSize == 0) {
            return true;
        } else return false;
    }

    public boolean isFull() {
        if (currentSize == maxSize) {
            return true;
        } else return false;
    }

    public boolean clean() { 
        front = 0;          
        rear = 0;
        while (rear != 0) {
            buffer[rear] = null;
            rear = (rear + 1) % maxSize;
        }   
        return true;
    }

    public static void main(String[] args) {
        CircularBuffer<Integer> buff = new CircularBuffer<>(7);
        buff.push(0);
        buff.push(1);
        buff.push(2);
        System.out.println(buff.size());
        System.out.println("The head element is: " + buff.pop());
        System.out.println("Size should be twoo: " + buff.size());
        System.out.println("The last element is one: " + buff.peekLast());
        System.out.println("Size should be two: " + buff.size());
        buff.clean();
        System.out.println("Size should be zero: " + buff.size());

    }
}
于 2017-03-06T12:11:03.123 回答
0

这是我为自己使用而编写的实现,但这可能很有用。

缓冲区包含最大的固定项集。该集合是循环的,旧项目会自动删除。调用者可以通过绝对增量索引(长)获取项目尾部,但项目可能在时间太远的调用之间丢失。这个类是完全线程安全的。

public sealed class ConcurrentCircularBuffer<T> : ICollection<T>
{
    private T[] _items;
    private int _index;
    private bool _full;

    public ConcurrentCircularBuffer(int capacity)
    {
        if (capacity <= 1) // need at least two items
            throw new ArgumentException(null, "capacity");

        Capacity = capacity;
        _items = new T[capacity];
    }

    public int Capacity { get; private set; }
    public long TotalCount { get; private set; }

    public int Count
    {
        get
        {
            lock (SyncObject) // full & _index need to be in sync
            {
                return _full ? Capacity : _index;
            }
        }
    }

    public void AddRange(IEnumerable<T> items)
    {
        if (items == null)
            return;

        lock (SyncObject)
        {
            foreach (var item in items)
            {
                AddWithLock(item);
            }
        }
    }

    private void AddWithLock(T item)
    {
        _items[_index] = item;
        _index++;
        if (_index == Capacity)
        {
            _full = true;
            _index = 0;
        }
        TotalCount++;
    }

    public void Add(T item)
    {
        lock (SyncObject)
        {
            AddWithLock(item);
        }
    }

    public void Clear()
    {
        lock (SyncObject)
        {
            _items = new T[Capacity];
            _index = 0;
            _full = false;
            TotalCount = 0;
        }
    }

    // this gives raw access to the underlying buffer. not sure I should keep that
    public T this[int index]
    {
        get
        {
            return _items[index];
        }
    }

    public T[] GetTail(long startIndex)
    {
        long lostCount;
        return GetTail(startIndex, out lostCount);
    }

    public T[] GetTail(long startIndex, out long lostCount)
    {
        if (startIndex < 0 || startIndex >= TotalCount)
            throw new ArgumentOutOfRangeException("startIndex");

        T[] array = ToArray();
        lostCount = (TotalCount - Count) - startIndex;
        if (lostCount >= 0)
            return array;

        lostCount = 0;

        // this maybe could optimized to not allocate the initial array
        // but in multi-threading environment, I suppose this is arguable (and more difficult).
        T[] chunk = new T[TotalCount - startIndex];
        Array.Copy(array, array.Length - (TotalCount - startIndex), chunk, 0, chunk.Length);
        return chunk;
    }

    public T[] ToArray()
    {
        lock (SyncObject)
        {
            T[] items = new T[_full ? Capacity : _index];
            if (_full)
            {
                if (_index == 0)
                {
                    Array.Copy(_items, items, Capacity);
                }
                else
                {
                    Array.Copy(_items, _index, items, 0, Capacity - _index);
                    Array.Copy(_items, 0, items, Capacity - _index, _index);
                }
            }
            else if (_index > 0)
            {
                Array.Copy(_items, items, _index);
            }
            return items;
        }
    }

    public IEnumerator<T> GetEnumerator()
    {
        return ToArray().AsEnumerable().GetEnumerator();
    }

    IEnumerator IEnumerable.GetEnumerator()
    {
        return GetEnumerator();
    }

    bool ICollection<T>.Contains(T item)
    {
        return _items.Contains(item);
    }

    void ICollection<T>.CopyTo(T[] array, int arrayIndex)
    {
        if (array == null)
            throw new ArgumentNullException("array");

        if (array.Rank != 1)
            throw new ArgumentException(null, "array");

        if (arrayIndex < 0)
            throw new ArgumentOutOfRangeException("arrayIndex");

        if ((array.Length - arrayIndex) < Count)
            throw new ArgumentException(null, "array");

        T[] thisArray = ToArray();
        Array.Copy(thisArray, 0, array, arrayIndex, thisArray.Length);
    }

    bool ICollection<T>.IsReadOnly
    {
        get
        {
            return false;
        }
    }

    bool ICollection<T>.Remove(T item)
    {
        return false;
    }

    private static object _syncObject;
    private static object SyncObject
    {
        get
        {
            if (_syncObject == null)
            {
                object obj = new object();
                Interlocked.CompareExchange(ref _syncObject, obj, null);
            }
            return _syncObject;
        }
    }
}
于 2014-04-27T17:29:29.583 回答
0

这是另一个使用 Apache 公共集合的 BoundedFifoBuffer 的实现。如果您使用来自 Apache 的最新 JAR,请使用 CircularFifoQueue,因为不推荐使用以下类

    BoundedFifoBuffer apiCallHistory = new BoundedFifoBuffer(20);

    for(int i =1 ; i < 25; i++){

        if(apiCallHistory.isFull()){
          System.out.println("removing :: "+apiCallHistory.remove());
        }
        apiCallHistory.add(i);

}
于 2014-06-04T02:03:21.727 回答
-8
// The following is in C#

public class fqueue
{

  // The following code implements a circular queue of objects

  //private data:

    private bool empty;
    private bool full;

    private int begin, end;

    private object[] x;

  //public data:

    public fqueue()
    {
        empty = !(full = false);
        begin = end = 0xA2;

        x = new object[256];
        return;
    }

    public fqueue(int size)
    {
        if (1 > size) throw new Exception("fqueue: Size cannot be zero or negative");

        empty = !(full = false);
        begin = end = 0xA2;

        x = new object[size];
        return;
    }

    public object write
    {
        set
        {
            if(full) throw new Exception("Write error: Queue is full");

            end = empty ? end : (end + 1) % x.Length;

            full = ((end + 1) % x.Length) == begin;
            empty = false;

            x[end] = value;
        }
    }

    public object read
    {
        get
        {
            if(empty) throw new Exception("Read error: Queue is empty");
            full = false;

            object ret = x[begin];

            begin = (empty=end==begin) ?
                begin :
                (begin + 1) % x.Length;

            return ret;
        }
    }

    public int maxSize
    {
        get
        {
            return x.Length;
        }
    }

    public int queueSize
    {
        get
        {
            return end - begin + (empty ? 0 : 1 + ((end < begin) ? x.Length : 0));
        }
    }

    public bool isEmpty
    {
        get
        {
            return empty;
        }
    }

    public bool isFull
    {
        get
        {
            return full;
        }
    }

    public int start
    {
        get
        {
            return begin;
        }
    }        

    public int finish
    {
        get
        {
            return end;
        }
    }
}
于 2010-05-22T21:25:43.770 回答