1

我正在尝试使用固定长度的字节数组数组来自定义阻塞队列的实现。我没有删除轮询元素,因此我调整了 put 方法以返回字节数组,以便可以直接写入(生产者线程使用 MappedByteBuffer 直接写入此字节数组)。我添加了“commitPut()”方法来简单地增加计数器并设置“长度”数组。(如果多个线程正在写入,这可能是并发问题,但我知道只有一个线程正在写入)。

以下是我目前拥有的。如果我逐步调试它会起作用,但是如果我“运行”它看起来会遇到一些锁定问题。我复制、剥离和调整了 ArrayBlockingQueue 代码。有更好知识的人可以看看课程并告诉我我做错了什么,或者如何做得更好(比如直接写入缓冲区并在同一步骤设置长度数组和计数器)?

public class ByteArrayBlockingQueue {

    private final int[] lens; // array to valid lengths
    private final byte[][] items; // array of byte arrays

    private int takeIndex = 0;
    private int putIndex = 0;
    private int count = 0;

    public volatile int polledLen = 0; // lenght of last polled byte array

    private final ReentrantLock lock;
    private final Condition notEmpty;
    private final Condition notFull;

    final int inc(int i) {
        return (++i == items.length)? 0 : i;
    }

    public ByteArrayBlockingQueue(int capacity, int size, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new byte[capacity][size];
        this.lens = new int[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull  = lock.newCondition();
    }

    public byte[] put() throws InterruptedException {
        final byte[][] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            try {
                while (count == items.length)
                    notFull.await();

            } catch (InterruptedException ie) {
                notFull.signal(); // propagate to non-interrupted thread
                throw ie;
            }
            //insert(e, len);
            return items[putIndex];
        } finally {
            lock.unlock();
        }
    }

    public void commitPut(int lenBuf) throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            lens[putIndex] = lenBuf;
            putIndex = inc(putIndex);
            ++count;
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }

    public byte[] poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count == 0)
                return null;
            final byte[][] items = this.items;
            final int[] lens = this.lens;
            byte[] e = items[takeIndex];
            this.polledLen = lens[takeIndex];
            //items[takeIndex] = null;
            takeIndex = inc(takeIndex);
            --count;
            notFull.signal();
            return e;

        } finally {
            lock.unlock();
        }
    }
}
4

1 回答 1

0

如果队列回绕,则字节数组可能会在被消费者读取之前被重用和覆盖。简而言之,您需要一种commitGet方法来确保生产者在用新数据覆盖数组之前等待消费者。

但是,我的建议是您依靠java.util.concurrent.BlockingQueue有第二个队列将它们从消费者返回给生产者,并依靠java.nio.ByteByffer来跟踪长度。生产者将执行以下操作:

ByteBuffer buffer = bufferQueue.poll(); // instead of your put()
buffer.put(source);                     // fill buffer from source MappedByteBuffer
buffer.flip();                          // set length to the amount written
dataQueue.offer(buffer);                // instead of commitPut()

消费者会:

ByteBuffer buffer = dataQueue.poll();   // instead of your get()
buffer.get(...);                        // use data
buffer.clear();                         // reset length               
bufferQueue.offer(buffer);              // this is the missing commitGet()

您最初应该capacityfreeQueue. 但是请注意,这仍然会将数据从source缓冲区复制一次到队列中的临时缓冲区中,就像您的原始代码已经做的那样。

如果您真的不想复制数据(并确保在所有消费者都读取数据之前源不会更改!),您更好的选择是使用单个阻塞队列并插入从ByteBuffer.slice()获得的缓冲区将要传递给消费者的每个数据块的源缓冲区。然后这些将被垃圾收集,但应该比字节数组本身占用更少的内存。

于 2013-05-20T08:19:03.637 回答