4

我正在开发一个应用程序,在该应用程序中我不断接收消息。我将这些消息存储在内存数据结构中(比如列表)。我想将这些消息写入文件,但只有在列表大小达到某个阈值之后,例如 100 条消息(对消息执行批处理)。一种方法是我可以在收到每条消息后简单地检查列表大小,如果达到阈值,则调用函数将消息写入文件。但是这种方法的问题是:

  1. 调用函数可能需要无限期地等待,直到所有消息都写入文件
  2. 传入的消息可能会在此过程中丢失,或者可能需要等待存储在列表中。

其他方式可能是产生一个新线程,它将独立地将消息写入文件。但是,当我将列表(包含消息)传递给执行写操作的线程时,它会使用不断进来的新消息进行更新。因此,新到达的消息也会被写入文件中,这是不期望的。

这不应该发生,因为我打算在下一批中写入新消息。

有人可以建议我解决此要求,或者对上述方法进行任何改进以解决我的问题。

4

5 回答 5

5

我发现一个更干净的解决方案是支持自动批处理。即批次的大小随着传入数据的速率而调整。

为此,您可以使用 BlockingQueue

// unbound queue will not block the producer.
final BlockingQueue<T> queue = new LinkedBlockingQueue<T>();

// to add an element.
queue.add(element);

// to get a batch of data
List<T> list = new ArrayList<T>(maxElements);
while(writing) {
    T t = queue.take(); // wait for at least one element.
    list.add(t);
    queue.drainTo(list, maxElements-1);
    // process list, e.g. write to a file.
    list.clear();
}

这种方法的好处是,如果生产者非常慢,你不会让元素保持不合理的时间,但随着速率的增加,批量大小自然会增长到任何能够跟上生产者的速度,这意味着你不必决定使用的最佳批量大小。

于 2012-10-09T14:03:24.387 回答
1

我建议采用以下方法:

  1. 将对消息列表的引用保存在AtomicReference.
  2. 当列表足够满时,将其替换为新的空列表;
  3. 将完整列表传递给将消息保存到文件的工作线程。

如果您从单个线程写入列表,则使用普通引用而不是AtomicReference.

于 2012-10-09T14:00:54.760 回答
1

重要的是要理解在 Java中你永远不会传递对象——只有引用(或原始值)。

选项:

  • 创建列表的副本,并将对该副本的引用传递给您的新线程
  • 使用生产者/消费者队列,因此您的“生产”线程只会值添加到队列中,而您的消费者线程只会从队列中获取项目以将它们写入磁盘。当然,您需要考虑在队列停止接受更多条目之前您希望队列可能达到多大。

我推荐后一种方法,使用java.util.concurrent包中的类来实现它;特别是BlockingQueue<E>实现。

于 2012-10-09T14:02:19.070 回答
0

为什么不让主消息接收进程在将旧消息列表传递给文件写入线程后创建一个新消息列表?

于 2012-10-09T14:02:58.867 回答
0

BoundedQueue您可以使用接受说对象的条件来实现自定义100,然后一次性编写。

现在您可以 BoundedQueue与不同的线程共享这个类实例,这些线程会将对象放入其中,并且会有线程调用writeAll()方法,直到您想要调用它。

BoundedBuffer boundedBuffer  = new BoundedBuffer();
boundedBuffer.put("test"); .......

从写线程做下面

boundedBuffer.writeAll();

下面是示例代码

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

class BoundedBuffer {
final Lock lock = new ReentrantLock();
final Condition full = lock.newCondition();
final Condition empty = lock.newCondition();

final Object[] items = new Object[100];
int count;

public void put(Object x) throws InterruptedException {
    lock.lock();
    try {
        while (count == items.length) {
            empty.signal();
            full.await();
        }
        items[count] = x;
        ++count;
    } finally {
        lock.unlock();
    }
}

public void writeAll() throws InterruptedException {
    lock.lock();
    try {
        while (count < items.length)
            empty.await();
        // Write to file here After write finished signal full condition
        count = 0;
        full.signal();

    } finally {
        lock.unlock();
    }
}
}
于 2012-10-09T14:03:08.940 回答