3

如何按行数读取和拆分/分块文件?

我想将文件划分为单独的缓冲区,同时确保一行不会在两个或多个缓冲区之间分割。我计划将这些缓冲区传递到它们自己的 pthread 中,以便它们可以执行某种类型的同时/异步处理。

我已经阅读了下面的答案,在 Linux 上使用 c 分块读写,但我认为它并不能完全回答有关确保一行不分成两个或多个缓冲区的问题。

4

3 回答 3

2

文件是如何编码的?如果每个字节代表一个字符,我会执行以下操作:

  1. 内存映射文件使用mmap().
  2. 通过基于适当的块大小计算它来告诉作业它们的大致开始和结束。
  3. 让每项工作通过查找下一项来找到其实际的开始和结束'\n'
  4. 同时处理各个块。
  5. 请注意,第一个块需要特殊处理,因为它的开始不是近似的而是精确的。
于 2012-11-20T23:55:58.203 回答
1

我会选择一个以字节为单位的块大小。然后我会寻找文件中的适当位置并一次读取一些少量字节,直到我得到一个换行符。

第一个块的最后一个字符是换行符。第二个块的第一个字符是换行符之后的字符。

始终寻找 pagesize() 边界并一次读取 pagesize() 字节以搜索换行符。这将倾向于确保您只从磁盘中提取所需的最小值以找到您的边界。您可以尝试一次读取 128 个字节或其他内容。但是你会冒着进行更多系统调用的风险。

我写了一个示例程序来计算字母频率。当然,这在很大程度上没有意义,因为它几乎肯定是 IO 绑定的。而且换行符的位置也无关紧要,因为它不是面向行的。但是,这只是一个例子。此外,它在很大程度上依赖于您拥有相当完整的 C++11 实现。

他们的关键功能是这样的:

// Find the offset of the next newline given a particular desired offset.
off_t next_linestart(int fd, off_t start)
{
   using ::std::size_t;
   using ::ssize_t;
   using ::pread;

   const size_t bufsize = 4096;
   char buf[bufsize];

   for (bool found = false; !found;) {
      const ssize_t result = pread(fd, buf, bufsize, start);
      if (result < 0) {
         throw ::std::system_error(errno, ::std::system_category(),
                                   "Read failure trying to find newline.");
      } else if (result == 0) {
         // End of file
         found = true;
      } else {
         const char * const nl_loc = ::std::find(buf, buf + result, '\n');
         if (nl_loc != (buf + result)) {
            start += ((nl_loc - buf) + 1);
            found = true;
         } else {
            start += result;
         }
      }
   }
   return start;
}

Also notice that I use pread. This is absolutely essential when you have multiple threads reading from different parts of the file.

The file descriptor is a shared resource between your threads. When one thread reads from the file using ordinary functions it alters a detail about this shared resource, the file pointer. The file pointer is the position in the file at which the next read will occur.

Simply using lseek before you read each time does not help this because it introduces a race condition between the lseek and the read.

The pread function allows you to read a bunch of bytes from a specific location within the file. It also doesn't alter the file pointer at all. Apart from the fact that it doesn't alter the file pointer, it's otherwise like combining an lseek and a read in the same call.

于 2012-11-20T23:56:14.153 回答
0

Define a class for the buffers. Give each one a large buffer space that is some multiple of page size and a start/end index, a method that reads the buffer from a passed-in stream and a 'lineParse' method that takes another *buffer instance as a parameter.

Make some *buffers and store them on a producer-consumer pool queue. Open the file, get a buffer from the pool and read into the buffer space from start to end, (return a boolean for error/EOF). Get another *buffer from the pool and pass it into the lineparse() of earlier one. In there, search backwards from the end of the data, looking for newLine. When found, reload the end index and memcpy the fragment of the last line, (if there is one - you might occasionally be lucky:), into the new, passed *buffer and set its start index. The first buffer now has whole lines and can be queued off to the thread/s that will process the lines. The second buffer has the fragment of line copied from the first and more data can be read from disk into its buffer space at its start index.

The line-processing thread/s can recycle the 'used' *buffers back to the pool.

Keep going until EOF, (or error:).

If you can, add a method to the buffer class that does the processing of the buffer.

Using large buffer classes and parsing back from the end will be mure efficient than continually reading small bits, looking for newlines from the start. Inter-thread comms is slow and the larger the buffers you can pass, the better.

Using a pool of buffers eliminates continual new/delete and provides flow-control - if the disk read thread is faster than the processing, the pool will empty and the disk read thread will block on it until some used buffers are recycled. This prevents memory runaway.

Note that if you use more than one processing thread, the buffers may get processed 'out-of-order' - this may, or may not, matter.

You can only gain in this scenario by ensuring that the advantage of lines being processed in parallel with disk-read latencies is greater than the overhead of inter-thread comms - communicating small buffers between threads is very likely to be counter-productive.

The biggest speedup would be experienced with networked disks that are fast overall, but have large latencies.

于 2012-11-21T10:07:00.423 回答