5

这是我在节点 v14.4.0 中提炼成最小的、可重现的示例的更大过程的一部分。在这段代码中,它不会从for循环内部输出任何内容。

我在控制台中只看到这个输出:

before for() loop
finished
finally
done

for await (const line1 of rl1)循环永远不会进入循环for- 它只是跳过它:

const fs = require('fs');
const readline = require('readline');
const { once } = require('events');

async function test(file1, file2) {
    try {
        const stream1 = fs.createReadStream(file1);
        await once(stream1, 'open');
        const rl1 = readline.createInterface({input: stream1, crlfDelay: Infinity});

        const stream2 = fs.createReadStream(file2);
        await once(stream2, 'open');
        const rl2 = readline.createInterface({input: stream2, crlfDelay: Infinity});

        console.log('before for() loop');
        for await (const line1 of rl1) {
            console.log(line1);
        }
        console.log('finished');
    } finally {
        console.log('finally');
    }
}

test("data/numbers.txt", "data/letters.txt").then(() => {
    console.log(`done`);
}).catch(err => {
    console.log('Got rejected promise:', err);
})

但是,如果我删除其中任何一个await once(stream, 'open')语句,那么for循环就会完全按照预期执行(列出rl1文件的所有行)。因此,显然,来自 readline 接口和流之间的异步迭代器存在一些计时问题。任何想法可能发生的事情。知道什么可能导致这个问题或如何解决它吗?

仅供参考,这await once(stream, 'open')是因为异步迭代器中的另一个错误,如果打开文件时出现问题,它不会拒绝,而如果无法打开文件,则会await once(stream, 'open')导致您正确地获得拒绝(基本上是在飞行前打开) .

如果您想知道为什么存在 stream2 代码,它在较大的项目中使用,但我已将此示例缩减为最小的、可重现的示例,并且只需要这么多代码来演示问题。


编辑:在尝试稍微不同的实现时,我发现如果我将两个once(stream, "open")调用组合在 aPromise.all()中,它就会起作用。所以,这有效:

const fs = require('fs');
const readline = require('readline');
const { once } = require('events');


async function test(file1, file2) {
    try {
        const stream1 = fs.createReadStream(file1);
        const rl1 = readline.createInterface({input: stream1, crlfDelay: Infinity});
        const stream2 = fs.createReadStream(file2);
        const rl2 = readline.createInterface({input: stream2, crlfDelay: Infinity});
        // pre-flight file open to catch any open errors here
        // because of existing bug in async iterator with file open errors
        await Promise.all([once(stream1, "open"), once(stream2, "open")]);

        console.log('before for() loop');
        for await (const line1 of rl1) {
            console.log(line1);
        }
        console.log('finished');
    } finally {
        console.log('finally');
    }
}

test("data/numbers.txt", "data/letters.txt").then(() => {
    console.log(`done`);
}).catch(err => {
    console.log('Got rejected promise:', err);
});

这显然不应该对您等待文件打开的确切方式敏感。某处存在一些计时错误。我想在 readline 或 readStream 上找到该错误并将其归档。有任何想法吗?

4

3 回答 3

5

事实证明,根本问题是readline.createInterface(),在调用它时,它会立即添加一个data事件侦听器(此处的代码参考)并恢复流以开始流流动。

input.on('data', ondata);

input.resume();

然后,在ondata侦听器中,它解析行的数据,当它找到一行时,它会在此处line触发一个事件。

for (let n = 0; n < lines.length; n++)
  this._onLine(lines[n]);

readline.createInterface()但是,在我的示例中,在调用和创建异步迭代器(它将监听line事件)之间发生了其他异步事情。因此,line事件正在发出,但还没有任何东西在监听它们。

因此,要正常工作,需要在调用后同步添加readline.createInterface()要监听事件的任何内容,否则会出现竞争条件并且事件可能会丢失。linereadline.createInterface()line


在我的原始代码示例中,一种可靠的解决方法是readline.createInterface()在完成await once(...). 然后,异步迭代器将在readline.createInterface()被调用后同步创建。

const fs = require('fs');
const readline = require('readline');
const { once } = require('events');


async function test(file1, file2) {
    try {
        const stream1 = fs.createReadStream(file1);
        const stream2 = fs.createReadStream(file2);
        // wait for both files to be open to catch any "open" errors here
        // since readline has bugs about not properly reporting file open errors
        // this await must be done before either call to readline.createInterface()
        // to avoid race conditions that can lead to lost lines of data
        await Promise.all([once(stream1, "open"), once(stream2, "open")]);

        const rl1 = readline.createInterface({input: stream1, crlfDelay: Infinity});
        const rl2 = readline.createInterface({input: stream2, crlfDelay: Infinity});

        console.log('before for() loop');
        for await (const line1 of rl1) {
            console.log(line1);
        }
        console.log('finished');
    } finally {
        console.log('finally');
    }
}

test("data/numbers.txt", "data/letters.txt").then(() => {
    console.log(`done`);
}).catch(err => {
    console.log('Got rejected promise:', err);
});

解决此一般问题的一种方法是进行更改readline.createInterface(),使其不添加data事件并恢复流,直到有人添加line事件侦听器。这将防止数据丢失。它将允许 readline 接口对象安静地坐在那里而不会丢失数据,直到其输出的接收器实际准备好。这将适用于异步迭代器,并且还可以防止混入其他异步代码的接口的其他用途可能丢失line事件。

关于这一点的注意事项已添加到此处的相关打开 readline 错误问题中。

于 2020-07-14T02:13:24.487 回答
2

也可以使用更现代的流 API将readline模块替换为简单的流。Transform现代流 API 支持开箱即用的异步迭代器以及背压(例如,流的写入端(文件读取)将暂停,直到流的读取端(行读取)被消耗)。

const fs = require('fs');
const { Transform } = require('stream');

function toLines() {
    let remaining = '';
    return new Transform({
        writableObjectMode: false,
        readableObjectMode: true,
        transform(chunk, encoding, callback) {
            try {
                const lines = (remaining + chunk).split(/\r?\n/g);
                remaining = lines.pop();
                for (const line of lines) {
                    this.push(line);
                }
                callback();
            } catch (err) {
                callback(err);
            }
        },
        flush(callback) {
            if (remaining !== '') {
                this.push(remaining);
            }
            callback();
        }
    });
}


async function test(file1, file2) {
    try {
        const stream1 = fs.createReadStream(file1, { encoding: 'utf8' });
        const rl1 = stream1.pipe(toLines());

        const stream2 = fs.createReadStream(file2, { encoding: 'utf8' });
        const rl2 = stream2.pipe(toLines());

        console.log('before for() loop');
        for await (const line1 of rl1) {
            console.log(line1);
        }
        console.log('finished');
    } finally {
        console.log('finally');
    }
}

这个例子不支持模块的crlfDelay选项readline,但是可以修改算法来做类似的事情。它还(据我所知)具有比readline模块支持的更好的错误处理。

于 2020-07-14T03:37:48.580 回答
1

如果在构造 readline 接口后立即创建异步迭代器,则可以按预期工作。如果您等待创建异步迭代器,您可能会丢失一些行,因为 readline 接口不缓冲行事件,但凭借异步迭代器,它们将被缓冲。

const fs = require('fs');
const readline = require('readline');
const { once } = require('events');

async function test(file1, file2) {
    try {
        const stream1 = fs.createReadStream(file1);
        await once(stream1, 'open');
        const rl1 = readline.createInterface({input: stream1, crlfDelay: Infinity});

        const rl1Iterator = rl1[Symbol.asyncIterator]();

        const stream2 = fs.createReadStream(file2);
        await once(stream2, 'open');
        const rl2 = readline.createInterface({input: stream2, crlfDelay: Infinity});

        console.log('before for() loop');
        for await (const line1 of rl1Iterator) {
            console.log(line1);
        }
        console.log('finished');
    } finally {
        console.log('finally');
    }
}

test("stream.txt", "stream.txt").then(() => {
    console.log(`done`);
}).catch(err => {
    console.log('Got rejected promise:', err);
})

根据评论中的讨论,这可能仍然不是一个理想的解决方案,因为 readline 模块还有其他各种问题,但我想我会添加一个答案来解决原始问题中指出的问题。

于 2020-07-14T01:09:48.577 回答