我想要一个生产者消费者问题,只消费最新的项目。这个问题可能有不同的名称,但我想不通!
生产者线程通过覆盖任何旧项目以非阻塞方式生成元素。单个消费者线程应该等待一个元素被创建并使用它。
我考虑过使用阻塞队列,但 java 实现不允许覆盖旧元素。循环缓冲区(如来自 commons 库)也不起作用,因为它不会阻塞消费者。
是否有用于此目的的数据结构,还是我需要找到更好的方法?
也可以使用锁等低级同步工具来解决这个问题,但我不知道该怎么做。
我想要一个生产者消费者问题,只消费最新的项目。这个问题可能有不同的名称,但我想不通!
生产者线程通过覆盖任何旧项目以非阻塞方式生成元素。单个消费者线程应该等待一个元素被创建并使用它。
我考虑过使用阻塞队列,但 java 实现不允许覆盖旧元素。循环缓冲区(如来自 commons 库)也不起作用,因为它不会阻塞消费者。
是否有用于此目的的数据结构,还是我需要找到更好的方法?
也可以使用锁等低级同步工具来解决这个问题,但我不知道该怎么做。
不需要特殊的数据结构。只需使用Object
. 他们在这种情况下非常好,因为阻塞消费者:
class ItemHolder<T> {
private T item;
public synchronized void produce(T item) {
this.item = item;
notify();
}
public synchronized T consume() {
while (item == null) {
wait();
}
T result = item;
item = null;
return result;
}
}
如果您想对最近的数据窗口进行操作,则覆盖循环缓冲区是一种很好的数据结构。元素像队列一样被添加和删除 FIFO,除了满缓冲区的添加将导致最旧的(队列头)元素被删除。
import java.util.NoSuchElementException;
/**
* Thread safe fixed size circular buffer implementation. Backed by an array.
*
* @author brad
*/
public class ArrayCircularBuffer<T> {
// internal data storage
private T[] data;
// indices for inserting and removing from queue
private int front = 0;
private int insertLocation = 0;
// number of elements in queue
private int size = 0;
/**
* Creates a circular buffer with the specified size.
*
* @param bufferSize
* - the maximum size of the buffer
*/
public ArrayCircularBuffer(int bufferSize) {
data = (T[]) new Object[bufferSize];
}
/**
* Inserts an item at the end of the queue. If the queue is full, the oldest
* value will be removed and head of the queue will become the second oldest
* value.
*
* @param item
* - the item to be inserted
*/
public synchronized void insert(T item) {
data[insertLocation] = item;
insertLocation = (insertLocation + 1) % data.length;
/**
* If the queue is full, this means we just overwrote the front of the
* queue. So increment the front location.
*/
if (size == data.length) {
front = (front + 1) % data.length;
} else {
size++;
}
}
/**
* Returns the number of elements in the buffer
*
* @return int - the number of elements inside this buffer
*/
public synchronized int size() {
return size;
}
/**
* Returns the head element of the queue.
*
* @return T
*/
public synchronized T removeFront() {
if (size == 0) {
throw new NoSuchElementException();
}
T retValue = data[front];
front = (front + 1) % data.length;
size--;
return retValue;
}
/**
* Returns the head of the queue but does not remove it.
*
* @return
*/
public synchronized T peekFront() {
if (size == 0) {
return null;
} else {
return data[front];
}
}
/**
* Returns the last element of the queue but does not remove it.
*
* @return T - the most recently added value
*/
public synchronized T peekLast() {
if (size == 0) {
return null;
} else {
int lastElement = insertLocation - 1;
if (lastElement < 0) {
lastElement = data.length - 1;
}
return data[lastElement];
}
}
}
这是循环有界队列,它(应该是)线程安全的并提供阻塞take
操作。
public class CircularQueue<T> {
private final int MAX_SIZE;
private final AtomicReferenceArray<T> buffer;
private final AtomicInteger start;
private final AtomicInteger end;
private final AtomicInteger len;
private final ReentrantLock rwlock;
private final Condition readCondition;
public CircularQueue(int size) {
MAX_SIZE = size;
buffer = new AtomicReferenceArray<T>(size);
start = new AtomicInteger(0);
end = new AtomicInteger(0);
len = new AtomicInteger(0);
rwlock = new ReentrantLock(true);
readCondition = rwlock.newCondition();
}
/**
* Adds to tail of the queue
*/
public void put(T val) {
try {
rwlock.lock();
buffer.set(end.get(), val);
end.set((end.get() + 1) % MAX_SIZE);
if (len.get() == MAX_SIZE) { // overwrite
start.set((start.get() + 1) % MAX_SIZE);
} else {
len.incrementAndGet();
}
readCondition.signal();
} finally {
rwlock.unlock();
}
}
/**
* Blocking removeFront operation
* @return
*/
public T take() {
T val = null;
try {
rwlock.lock();
while (len.get() == 0) {
try {
readCondition.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
val = buffer.get(start.get());
buffer.set(start.get(), null);
start.set((start.get() + 1) % MAX_SIZE);
len.decrementAndGet();
} finally {
rwlock.unlock();
}
return val;
}
public int size() {
int curLen = 0;
try {
rwlock.lock();
curLen = len.get();
} finally {
rwlock.unlock();
}
return curLen;
}
}
有许多操作尚未添加poll
,例如offer
等。但是您可以使用一些线程进行测试:
如果它运行正确,它将挂起你的 JVM。
public static void main(String[] args) {
final int MAX_QUEUE_SIZE = 4;
final CircularQueue<Integer> q = new CircularQueue<Integer>(MAX_QUEUE_SIZE);
new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < MAX_QUEUE_SIZE; ++i) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("Putting: from " + Thread.currentThread().getName() + " " + i);
q.put(i);
}
for (int i = 0; i < MAX_QUEUE_SIZE; ++i) {
System.out.println("Trying to get from " + Thread.currentThread().getName() + " " + q.take());
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
for (int i = 10; i < 10 + MAX_QUEUE_SIZE; ++i) {
try {
Thread.sleep(1001);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("Putting: from " + Thread.currentThread().getName() + " " + i);
q.put(i);
}
for (int i = 0; i < MAX_QUEUE_SIZE; ++i) {
System.out.println("Trying to get from " + Thread.currentThread().getName() + " " + q.take());
}
}
}).start();
}
您的输出应该可能匹配
Putting: from Thread-0 0
Putting: from Thread-1 10
Putting: from Thread-0 1
Putting: from Thread-1 11
Putting: from Thread-0 2
Putting: from Thread-1 12
Putting: from Thread-0 3
Trying to get from Thread-0 11
Trying to get from Thread-0 2
Trying to get from Thread-0 12
Trying to get from Thread-0 3
Putting: from Thread-1 13
Trying to get from Thread-1 13
来自 Thread-1 的其他 take 操作正在等待相应的 put 操作,因为 Thread-1 比 Thread-0 稍慢。
Java为此提供的最简单的解决方案是:
Per doc:“创建一个执行器,它使用单个工作线程操作无界队列,并在需要时使用提供的 ThreadFactory 创建一个新线程”