0

我们有一个多线程程序,它执行以下操作:

thread_1是硬盘的监听器,用于检测创建的​​新文件。我们WatchService在 Java 7 中使用 api。当另一个程序创建新文件时,thread_1检测并获取它并将其放入PriorityBlockingQueueex:

priorityBlockingQueue.add(FileObject)

FileObjComparator是一个自定义对象实现比较器。当检测到此文件时,它按我从系统时间获取的创建时间和fileCreatedTime字段排序:FileObject

 public int compare(FileObject o1, FileObject o2) {
        return o1.getFileCreatedTime().compareTo(o2.getFileCreatedTime());
    }

priorityBlockingQueue初始化为:

DataFileQueue.priorityBlockingQueue = new PriorityBlockingQueue<FileObject>(100000, new FileObjComparator());

并将Thread_2在此文件中的最后一个文件旁边处理它priorityBlockingQueue

if(priorityBlockingQueue.size) > 1)
   process(priorityBlockingQueue.poll());

2 个线程并行运行,但是当我处理许多大文件时,有时会在写入文件时Thread_2处理它。我检测到这一点是因为重新检查内容文件和处理结果。

该程序在 Centos 6.2 上运行,该硬盘分区以异步模式挂载。谢谢你的帮助。

4

4 回答 4

2

如果您真的在处理倒数第二个文件,那么我很惊讶它的大小正在增长,除非多个进程或线程正在生成输入文件。确保创建文件的另一个进程在写入下一个文件之前刷新并关闭每个文件。

  • 您可以分块读取文件,然后在一段时间内返回以查看是否向文件中添加了任何其他数据,并在当时使用RandomAccessFile. 如果您正在逐行阅读文件,那么不幸的是,您需要自己进行分页。如果文件是基于行的,那么您应该确保行终止字符关闭文件。

  • 您可以尝试的另一件事是稍微延迟文件的处理以让文件系统刷新其缓冲区。丑陋且不可靠,但也许是必要的。

  • 如果您可以调整输出过程,那么您可以用魔术字符串结束文件,然后在看到魔术字符串之前不处理文件。

  • 您可以让进程写入文件,将文件的大小写入带有“.size”扩展名(或其他内容)的单独文件中。大小文件将帮助您验证您正在读取的字符数是否正确。

  • 要尝试的另一件事是,Runtime.exec("/bin/sync");如果您在 ~unix 系统上运行,则在开始从文件读取以同步文件系统之前。问题是对此的支持高度依赖于操作系统。它也可以成为真正的性能杀手。他是我 Mac 上的手册页:

    可以调用同步实用程序以确保所有磁盘写入都已完成

于 2012-07-04T16:55:32.973 回答
1

您可以尝试使用信号量来组织对每个文件的访问,因为一次不会有多个线程写入任何文件。我认为每个文件对象都应该有自己的信号量,每个线程都应该在写入文件之前尝试获取信号量。

于 2012-07-04T16:46:26.587 回答
0

您的比较器应按上次修改时间排序,而不是创建时间。例如,我不知道您如何知道以 A、B 顺序打开的两个文件将完全以相同的顺序写入,除非您肯定知道文件生成是严格按顺序进行的。你还没说呢。

于 2012-07-07T02:48:38.463 回答
-2

编辑更详细的答案。

问题是 ...

你写道

它是按创建时间和 FileObject 中的 fileCreatedTime 字段排序的,我在检测到这个文件时从系统时间获得: ...

thread_1 是硬盘监听器,用于检测创建的​​新文件。我们在 Java 7 中使用 WatchService api。当另一个程序创建新文件时。... thread_1 检测并获取它并将其放入 PriorityBlockingQueue ex

  • 创建时间和“文件写入完成时间”可能有很大不同。(取决于文件大小)。

例如:

打开文件管理器。开始下载大约 60 mb 的文件。注意创建时间。大约 3 分钟后,看看最后的时间。

要检测新文件,查看创建时间是“将其放入 PriorityBlockingQueue ex :”的错误时刻

thraed_1 必须等到文件写入完成。然后他可以把它放到“一个 PriorityBlockingQueue ex :”

如何检测文件的写入是否完成?

3个不太复杂的选项

  • a.) 比较文件已创建和文件就绪时间。 或者
  • b.) 观察文件的大小正在稳步增长。如果文件完成,它将停止增长。 或者
  • c.) 尝试将其移动到临时文件夹。

你更喜欢什么?

我更喜欢解决方案c。

无法移动为写入而打开的文件。在第3 方程序关闭文件后,它可以被移动。

必要的步骤。

  • thread_1 正在监视由3rd 方程序创建的文件。
  • thread_1 试图将其移动到 xyztmp 文件夹(每 10 或 20 或 ... 秒)。
  • thread_1 在 xyztmp 文件夹中寻找新的传入文件并将其放入 PriorityBlockingQueue ex。

解决方案 b. 更复杂。

thread_1 将传入的文件名和大小放在一个控制数组中以比较 3-5 次。(每 5 秒或更长时间)。

大批

(filenamexyz.dat, size1, size2, size3, ...).
(filenameabc.dat, size1, size2, size3, ...).
(filenamefgh.dat, size1, size2, size3, ...).
....

如果每 5 个比较大小由名称标识的文件相同,则第 3 方程序已完成对该文件的写入。

现在可以将其放入 PriorityBlockingQueue 例如:

让我们一步一步看

我们假设 thread_2 在 list.size 为 2 时启动!

  • 3rd 方程序开始一一写入文件。
  • 第 3 方程序开始写入 FILE_1。
  • thread_1 检测到创建的 FILE_1,将其放入列表中。
  • 第 3 方程序完成写入 FILE_1。
  • 第 3 方程序开始写入 FILE_2。
  • thread_1 检测到创建的 FILE_2,将其放入列表中。
  • if(priorityBlockingQueue.size) > 1) 真
  • thread_2 从读取和处理列表 FILE_1 中的第一个文件开始。

  • 第 3 方程序完成写入 FILE_2。
  • 第 3 方程序开始写入 FILE_3。
  • thread_1 检测到创建的 FILE_3,将其放入列表中。
  • thread_2 完成处理 FILE_1。
  • thread_2 从列表 FILE_2 中的下一个文件开始。

  • 第 3 方程序完成写入 FILE_3。
  • 第 3 方程序开始写入 FILE_4。
  • thread_1 检测到创建的 FILE_4,将其放入列表中。
  • thread_2 完成处理 FILE_2。
  • thread_2 从列表 FILE_3 中的下一个文件开始。

    现在麻烦开始了


  • 第 3 方程序完成写入 FILE_4。
  • 第 3 方程序开始写入 FILE_5。(FILE_5 大于 FILE_4)。
  • thread_1 检测到创建的 FILE_5,将其放入列表中。
  • thread_2 完成处理 FILE_3。
  • thread_2 从列表 FILE_4 中的下一个文件开始。
  • thread_2 完成处理 FILE_4。
  • thread_2 从列表 FILE_5 中的下一个文件开始。
  • thread_2 完成处理 FILE_5。
  • 第 3 方程序完成写入 FILE_5。

如果第 3 方程序写入的文件较大,需要更多时间来写入,并且 thread_2 已完成读取较小的 FILE_4 。

thread_2 将下一个文件从列表中取出 - FILE_5,无论该文件是否已准备好读取。

FILE_5 是第 3 方程序仍在写入的文件。FILE_5 是thread_2 正在读取和处理的文件。thread_2读取的字节只是第3 方程序此时写入的字节。

于 2012-07-04T18:34:59.737 回答