16

我需要找出使用 node.js 实时读取写入文件的数据的最佳方法。麻烦的是,Node 是一艘快速移动的船,这使得找到解决问题的最佳方法变得困难。

我想要做什么
我有一个 java 进程正在做某事,然后将它所做的事情的结果写入一个文本文件。它通常需要 5 分钟到 5 小时来运行,数据一直在写入,并且可以达到相当高的吞吐率(大约 1000 行/秒)。

我想实时读取这个文件,然后使用节点聚合数据并将其写入可以在客户端上绘制图形的套接字。

客户端、图形、套接字和聚合逻辑都已完成,但我对读取文件的最佳方法感到困惑。

我尝试过(或至少玩过)
FIFO - 我可以告诉我的 Java 进程写入 fifo 并使用节点读取它,这实际上是我们目前使用 Perl 实现的方式,但是因为其他一切都在节点中运行将代码移植过来是有意义的。

Unix Sockets- 如上。

fs.watchFile- 这能满足我们的需要吗?

fs.createReadStream- 这比 watchFile 好吗?

fs& tail -f- 似乎是一个黑客。

实际上,我
倾向于使用 Unix 套接字的问题是什么,这似乎是最快的选择。但是 node 是否有更好的内置功能来实时从 fs 读取文件?

4

4 回答 4

9

如果您想将文件保留为数据的持久存储以防止在系统崩溃或正在运行的进程网络中的一个成员死亡的情况下丢失流,您仍然可以继续写入文件并读取从中。

如果您不需要此文件作为 Java 进程生成的结果的持久存储,那么使用 Unix 套接字在易用性和性能方面都要好得多。

fs.watchFile()这不是您所需要的,因为它在文件系统报告它时适用于文件统计信息,并且由于您想读取已经写入的文件,所以这不是您想要的。

简短更新:我很遗憾地意识到,虽然我fs.watchFile()在上一段中指责使用文件统计信息,但我自己在下面的示例代码中做了同样的事情!虽然我已经警告读者“小心!” 因为我只用了几分钟就写好了,甚至没有测试好;不过,如果底层系统支持它,它可以通过使用fs.watch()而不是做得更好。watchFilefstatSync

为了从文件中读取/写入,我刚刚在下面写了一些有趣的东西:

test-fs-writer.js : [你不需要这个,因为你在你的 Java 进程中写文件]

var fs = require('fs'),
    lineno=0;

var stream = fs.createWriteStream('test-read-write.txt', {flags:'a'});

stream.on('open', function() {
    console.log('Stream opened, will start writing in 2 secs');
    setInterval(function() { stream.write((++lineno)+' oi!\n'); }, 2000);
});

test-fs-reader.js : [注意,这只是演示,检查 err 对象!]

var fs = require('fs'),
    bite_size = 256,
    readbytes = 0,
    file;

fs.open('test-read-write.txt', 'r', function(err, fd) { file = fd; readsome(); });

function readsome() {
    var stats = fs.fstatSync(file); // yes sometimes async does not make sense!
    if(stats.size<readbytes+1) {
        console.log('Hehe I am much faster than your writer..! I will sleep for a while, I deserve it!');
        setTimeout(readsome, 3000);
    }
    else {
        fs.read(file, new Buffer(bite_size), 0, bite_size, readbytes, processsome);
    }
}

function processsome(err, bytecount, buff) {
    console.log('Read', bytecount, 'and will process it now.');

    // Here we will process our incoming data:
        // Do whatever you need. Just be careful about not using beyond the bytecount in buff.
        console.log(buff.toString('utf-8', 0, bytecount));

    // So we continue reading from where we left:
    readbytes+=bytecount;
    process.nextTick(readsome);
}

您可以安全地避免使用nextTickreadsome()直接调用。由于我们仍在此处进行同步,因此在任何意义上都没有必要。我只是喜欢它。:p

奥利弗·劳埃德编辑

以上面的例子为例,但将其扩展为读取 CSV 数据给出:

var lastLineFeed,
    lineArray;
function processsome(err, bytecount, buff) {
    lastLineFeed = buff.toString('utf-8', 0, bytecount).lastIndexOf('\n');

    if(lastLineFeed > -1){

        // Split the buffer by line
        lineArray = buff.toString('utf-8', 0, bytecount).slice(0,lastLineFeed).split('\n');

        // Then split each line by comma
        for(i=0;i<lineArray.length;i++){
            // Add read rows to an array for use elsewhere
            valueArray.push(lineArray[i].split(','));
        }   

        // Set a new position to read from
        readbytes+=lastLineFeed+1;
    } else {
        // No complete lines were read
        readbytes+=bytecount;
    }
    process.nextTick(readFile);
}
于 2012-06-27T18:47:44.317 回答
7

为什么你认为tail -f是黑客?

在弄清楚我找到了一个很好的例子时,我会做类似的事情。使用 node.js 和 WebSocket 的实时在线活动监控示例:http:
//blog.new-bamboo.co.uk/2009/12/7/real-time-online-activity-monitor-example-with-node-js -and-websocket

只是为了使这个答案完整,我给你写了一个示例代码,它可以在 0.8.0 下运行 - (http 服务器可能是一个 hack)。

一个子进程是用 tail 产生的,因为子进程是一个带有三个流的 EventEmitter(我们在我们的例子中使用 stdout),你可以添加一个监听器on

文件名:tailServer.js

用法:node tailServer /var/log/filename.log

var http = require("http");
var filename = process.argv[2];


if (!filename)
    return console.log("Usage: node tailServer filename");

var spawn = require('child_process').spawn;
var tail = spawn('tail', ['-f', filename]);

http.createServer(function (request, response) {
    console.log('request starting...');

    response.writeHead(200, {'Content-Type': 'text/plain' });

    tail.stdout.on('data', function (data) {
      response.write('' + data);                
    });
}).listen(8088);

console.log('Server running at http://127.0.0.1:8088/');
于 2012-06-27T18:05:14.540 回答
1

这个模块是@hasanyasin 建议的原则的实现:

https://github.com/felixge/node-growth-file

于 2012-07-05T00:12:20.627 回答
0

我从@hasanyasin 那里得到了答案,并将其包装成一个模块化的承诺。基本思想是传递一个文件和一个处理函数,该函数对从文件中读取的字符串化缓冲区执行某些操作。如果处理函数返回 true,则文件将停止读取。您还可以设置一个超时,如果处理程序没有足够快地返回 true,它将终止读取。

如果 resolve() 由于超时而被调用,则 promiser 将返回 true,否则将返回 false。

使用示例见底部。

// https://stackoverflow.com/a/11233045

var fs = require('fs');
var Promise = require('promise');

class liveReaderPromiseMe {
    constructor(file, buffStringHandler, opts) {
        /*
            var opts = {
                starting_position: 0,
                byte_size: 256,
                check_for_bytes_every_ms: 3000,
                no_handler_resolution_timeout_ms: null
            };
        */

        if (file == null) {
            throw new Error("file arg must be present");
        } else {
            this.file = file;
        }

        if (buffStringHandler == null) {
            throw new Error("buffStringHandler arg must be present");
        } else {
            this.buffStringHandler = buffStringHandler;
        }

        if (opts == null) {
            opts = {};
        }

        if (opts.starting_position == null) {
            this.current_position = 0;
        } else {
            this.current_position = opts.starting_position;
        }

        if (opts.byte_size == null) {
            this.byte_size = 256;
        } else {
            this.byte_size = opts.byte_size;
        }

        if (opts.check_for_bytes_every_ms == null) {
            this.check_for_bytes_every_ms = 3000;
        } else {
            this.check_for_bytes_every_ms = opts.check_for_bytes_every_ms;
        }

        if (opts.no_handler_resolution_timeout_ms == null) {
            this.no_handler_resolution_timeout_ms = null;
        } else {
            this.no_handler_resolution_timeout_ms = opts.no_handler_resolution_timeout_ms;
        }
    }


    startHandlerTimeout() {
        if (this.no_handler_resolution_timeout_ms && (this._handlerTimer == null)) {
            var that = this;
            this._handlerTimer = setTimeout(
                function() {
                    that._is_handler_timed_out = true;
                },
                this.no_handler_resolution_timeout_ms
            );
        }
    }

    clearHandlerTimeout() {
        if (this._handlerTimer != null) {
            clearTimeout(this._handlerTimer);
            this._handlerTimer = null;
        }
        this._is_handler_timed_out = false;
    }

    isHandlerTimedOut() {
        return !!this._is_handler_timed_out;
    }


    fsReadCallback(err, bytecount, buff) {
        try {
            if (err) {
                throw err;
            } else {
                this.current_position += bytecount;
                var buff_str = buff.toString('utf-8', 0, bytecount);

                var that = this;

                Promise.resolve().then(function() {
                    return that.buffStringHandler(buff_str);
                }).then(function(is_handler_resolved) {
                    if (is_handler_resolved) {
                        that.resolve(false);
                    } else {
                        process.nextTick(that.doReading.bind(that));
                    }
                }).catch(function(err) {
                    that.reject(err);
                });
            }
        } catch(err) {
            this.reject(err);
        }
    }

    fsRead(bytecount) {
        fs.read(
            this.file,
            new Buffer(bytecount),
            0,
            bytecount,
            this.current_position,
            this.fsReadCallback.bind(this)
        );
    }

    doReading() {
        if (this.isHandlerTimedOut()) {
            return this.resolve(true);
        } 

        var max_next_bytes = fs.fstatSync(this.file).size - this.current_position;
        if (max_next_bytes) {
            this.fsRead( (this.byte_size > max_next_bytes) ? max_next_bytes : this.byte_size );
        } else {
            setTimeout(this.doReading.bind(this), this.check_for_bytes_every_ms);
        }
    }


    promiser() {
        var that = this;
        return new Promise(function(resolve, reject) {
            that.resolve = resolve;
            that.reject = reject;
            that.doReading();
            that.startHandlerTimeout();
        }).then(function(was_resolved_by_timeout) {
            that.clearHandlerTimeout();
            return was_resolved_by_timeout;
        });
    }
}


module.exports = function(file, buffStringHandler, opts) {
    try {
        var live_reader = new liveReaderPromiseMe(file, buffStringHandler, opts);
        return live_reader.promiser();
    } catch(err) {
        return Promise.reject(err);
    }
};

然后像这样使用上面的代码:

var fs = require('fs');
var path = require('path');
var Promise = require('promise');
var liveReadAppendingFilePromiser = require('./path/to/liveReadAppendingFilePromiser');

var ending_str = '_THIS_IS_THE_END_';
var test_path = path.join('E:/tmp/test.txt');

var s_list = [];
var buffStringHandler = function(s) {
    s_list.push(s);
    var tmp = s_list.join('');
    if (-1 !== tmp.indexOf(ending_str)) {
        // if this return never occurs, then the file will be read until no_handler_resolution_timeout_ms
        // by default, no_handler_resolution_timeout_ms is null, so read will continue forever until this function returns something that evaluates to true
        return true;
        // you can also return a promise:
        //  return Promise.resolve().then(function() { return true; } );
    }
};

var appender = fs.openSync(test_path, 'a');
try {
    var reader = fs.openSync(test_path, 'r');
    try {
        var options = {
            starting_position: 0,
            byte_size: 256,
            check_for_bytes_every_ms: 3000,
            no_handler_resolution_timeout_ms: 10000,
        };

        liveReadAppendingFilePromiser(reader, buffStringHandler, options)
        .then(function(did_reader_time_out) {
            console.log('reader timed out: ', did_reader_time_out);
            console.log(s_list.join(''));
        }).catch(function(err) {
            console.error('bad stuff: ', err);
        }).then(function() {
            fs.closeSync(appender);
            fs.closeSync(reader);
        });

        fs.write(appender, '\ncheck it out, I am a string');
        fs.write(appender, '\nwho killed kenny');
        //fs.write(appender, ending_str);
    } catch(err) {
        fs.closeSync(reader);
        console.log('err1');
        throw err;
    }
} catch(err) {
    fs.closeSync(appender);
        console.log('err2');
    throw err;
}
于 2017-06-14T05:55:47.753 回答