2

我有一个生产者/消费者设置:我们的客户端向我们提供服务器处理的数据,而我们的客户端通过不断写入文件向我们的服务器提供数据。我们的服务器使用 inotify 查找任何文件修改,并处理新数据。

问题:服务器中的文件阅读器有一个大小为 4096 的缓冲区。我有一个模拟上述情况的单元测试。测试不断写入一个打开的文件,文件阅读器不断尝试读取一个进程。但是,我注意到在读取了比 4096 小得多的第一条记录后,在 ifstream 对象中设置了一个错误标志。这意味着没有处理任何到达的新数据。一个简单的解决方法似乎是在每次读取后调用 ifstream::clear ,这确实解决了问题。但是,这是怎么回事?这是正确的解决方案吗?

4

2 回答 2

1

首先,根据您的系统,可能会或可能不会读取另一个进程写入的文件:在 Windows 上,打开文件时的正常设置使访问独占。我对 Window 了解不多,无法判断是否还有其他设置。在 POSIX 系统上,可以打开具有适当权限的文件以供不同进程读取和写入。从它的声音来看,您使用的是 Linux,即遵循 POSIX 规范的东西。

但是,在更改时轮询文件的方法并不完全理想:正如您所注意到的,每次到达当前文件的末尾时都会出现“错误”。实际上,到达文件末尾并不是真正的错误,但尝试解码超出文件末尾的内容是错误的。此外,仍然会设置超出文件末尾的读取std::ios_base::eofbit,因此,流不会是good(). 如果您坚持使用这种方法,那么除了阅读到文件末尾并以某种方式处理不完整的读取之外,别无选择。

但是,如果您可以控制创建文件,则可以做一个简单的技巧:您可以创建它,而不是让文件成为普通文件,而是使用写入程序将写入的文件名mkfifo创建一个命名管道:在 POSIX 系统上打开一个文件,如果已经有一个文件,它不会创建一个新文件,而是使用现有文件。好吧,文件或其他任何东西都由文件名寻址(除了文件和命名管道,您可能会看到目录、字符或块特殊设备,可能还有其他)。

命名管道是一种奇怪的野兽,旨在让两个进程相互通信:一个进程写入一端的内容可以在另一端被另一个进程读取!命名管道本身没有任何内容,即,如果您需要文件的内容以及与另一个进程的通信,您可能需要在某处复制内容。打开一个用于读取的命名管道,只要它到达文件的当前末尾,它将阻塞,即,最初读取将阻塞,直到有写入器。类似地,对命名管道的写入将阻塞,直到有读取器。一旦有两个进程与各自的另一端通信,在另一个进程退出后读取或写入命名管道时将收到错误。

于 2012-09-21T18:37:59.877 回答
0

如果您擅长一次又一次地打开和关闭文件,则此问题的正确解决方案是存储最后读取的位置,并在文件更新后从那里开始:

确切的算法将是:

  1. 设置 start_pos = 0 ,结束 pos =0
  2. 更新 end_pos = infile.tellg(),
  3. 将 get 指针移动到 start_pos(使用 seekg())并读取块(end_pos - start_pos)。
  4. 更新 start_pos = end_pos 然后关闭文件。
  5. 睡眠一段时间,然后再次打开文件。
  6. 如果文件流仍然不好,请关闭文件并跳转到步骤 5。
  7. 如果文件流良好,则跳至步骤 1。

所有 c++ 参考都在http://www.cplusplus.com/reference/istream/istream/seekg/ 你可以使用这里给出的示例代码。

确切的代码将是:

`
#include <iostream>     
#include <fstream>      

int main(int argc, char *argv[]) {

    if (argc != 2)
    {
        std::cout << "Please pass filename with full path \n";
        return -1;
    }

    int end_pos = 0, start_pos = 0;
    long length;
    char* buffer;
    char *filePath = argv[1];
    std::ifstream is(filePath, std::ifstream::binary);  

    while (1)
    {
        if (is) {

            is.seekg(0, is.end);
            end_pos = is.tellg(); //always update end pointer to end of the file  
            is.seekg(start_pos, is.beg); // move read pointer to the new start position 
            // allocate memory:
            length = end_pos - start_pos;
            buffer = new char[length];

            // read data as a block: (end_pos - start_pos) blocks form read pointer 
            is.read(buffer, length);    
            is.close();    
            // print content:
            std::cout.write(buffer, length);    
            delete[] buffer;
            start_pos = end_pos; // update start pointer    
        }

        //wait and restart with new data 
        sleep(1);
        is.open(filePath, std::ifstream::binary);    
    }    
    return 0;
}

`

于 2018-08-28T07:09:17.203 回答