1

我希望多次写入文件(100k+),写入将通过不稳定的网络进行。所以要做到这一点,我正在考虑使用 JavaExecutorService来帮助生成线程,但我不太确定哪种设置组合会正确地发生以下情况:

  1. 一次只允许 1 次写入(当然顺序很重要)
  2. 允许写入有足够的时间进行每次写入(比如 5 秒),此时只需保释即可
  3. 如果写入缓慢,让 Executor 将写入收集到队列中并等待。
  4. 在线程队列为空之前,不要让整个程序退出。
  5. 由作者分隔线程。即,如果相同的写入器出现在此函数中,则将其放入自己的队列中。如果有不同的写入器指针进入,则给它自己的队列(无需将不同的写入器放在同一个队列中)。

我相信这可以通过结合执行器功能以及主程序对象上的.wait()and命令来完成。.notify()但是,我只是不确定如何精确地使用 executor API 来完成这项工作。

这是我得到的:

private void writeToFileInSeperateThread(final PrintWriter writer, final String text) {
  ExecutorService executor = Executors.newSingleThreadExecutor();
  try {
    executor.submit(new Thread(new Runnable() {
      public void run() {
        writer.println(text);
      }
    })).get(5L, TimeUnit.SECONDS);
  } catch (Exception e) {
    e.printStackTrace();
  }
  executor.shutdown();
}

该方法将在单个进程中被调用 100k+ 次,所以我不确定我是否应该ExcutorService每次都创建一个新实例,还是使用同一个实例?(在我尝试使用相同的方法时,我不断收到我认为与.newSingleThreadExecutor()指令有关的异常。

希望保持 Java 5 兼容,但 Java 6 没问题。在 Windows XP/7 上运行。

更新:这似乎在初始测试中起到了作用:

  private class WriterStringPair {
    public final PrintWriter writer;
    public final String text;

    public WriterStringPair(PrintWriter writer, String text) {
      this.writer = writer;
      this.text = text;
    }
  }

  private void writeTextInSeperateThread(Writer writer, String text) {
    try {
      textQueue.offer(new WriterStringPair(writer, text), 300L, TimeUnit.SECONDS);
    } catch (InterruptedException e) {
      errOut.println(e);
      e.printStackTrace();
    }
  }

  final BlockingQueue<WriterStringPair> textQueue = new ArrayBlockingQueue<WriterStringPair>(500);

  private void setWritingThread() {
    new Thread((new Runnable() {
      public void run() {
        WriterStringPair q;
        while (!shutdown && !Thread.currentThread().isInterrupted()) {
          try {
            q = textQueue.poll(1L, TimeUnit.SECONDS);
            if (q != null) {
              q.writer.write(q.text + "\n");
              q.writer.flush();
            }
          } catch (Exception e) {
            e.printStackTrace();
          }
        }
      }
    })).start();
  }
4

2 回答 2

3

在不了解有关您在“不稳定”网络上编写文件的更多详细信息及其含义的情况下,我们很难提供细节。但这里有一些事情需要考虑。

我会计算出有多少并发编写器可以为您提供最佳性能——或者在目标上提供最可靠的输出。然后,您应该启动固定数量的这些写入器,每个写入器都从一个共享的BlockingQueue(如果重要的话,每个写入器一个队列)消费。您应该很快超过您的 IO 或网络带宽,因此从 5 个左右的写入器开始,并根据需要增加或减少应该可以工作。

public void run() {
   writer.println(text);
}

是的,就每条线的工作而言,您不想做这种事情。最好将其String text放入 a中BlockingQueue<String>,然后让您的编写器Runnable类在该队列的出列中运行,ExecutorService并且仅在队列为空或设置shutdown布尔值时停止。

正如彼得所提到的,您需要小心使用排队的文本字符串填充内存。如果输入文本很大,您应该将您的限制设置BlockingQueue为几百左右。

我不确定是否应该ExecutorService每次都创建一个新实例,还是使用同一个?

当然,您应该拥有一个服务,而不是一遍又一遍地创建一个。

我相信这可以通过结合执行器功能以及主程序对象上的 .wait() 和 .notify() 命令来完成。

如果你写得正确,你不应该需要使用等待和通知。我有一个volatile boolean shutdown = false你所有的作家都看的。他们每个人都通过查看关机从文本队列中出列。就像是:

while (!shutdown && !Thread.currentThread().isInterrupgted()) {
    String text = textQueue.poll(1, TimeUnit.SECONDS);
    if (text != null) {
        // write the text
    }
}

如果写入失败或发生任何事情,您可以重试或任何必要的事情。

于 2013-07-17T18:31:41.380 回答
2

几个问题

  • println 不会告诉您是否存在 IOException,因此如果您想要一些防止错误的保护,这将无济于事。
  • 为每一行启动一个 ExecutorService 非常慢,比提交任务慢得多。
  • 创建大量任务不仅会非常慢,而且如果这样做的话,可能会耗尽你所有的内存。
  • 您将 Runnable,而不是 Threads 提交给 ExecutorService
  • shutdown 不会停止线程,例如它在写入时阻塞。这可能导致许多线程试图同时写入。

我建议将数据保存到 JMS 等本地系统或数据库或文件(例如 Java-Chronicle),并在可用时将数据复制到 NFS 的单独进程。

那是假设您无法修复 NFS,因此它不是片状的。

于 2013-07-17T18:32:15.047 回答