目前,我正在实现一个带有 CSV 文件流的管道,以将其行写入具有特定模型的数据库。我开始将所有内容都写入标准输出。当我附加我的自定义(转换)产品地图流时,这很快就会出现意外行为。
起初我只使用 fs 读取流,将其传送到 csv 转换器(基于 npm 包 csv-streamify)并写入 process.stdout。一切都像往常一样流动。
但后来我连接了我的自定义变压器,它开始表现得很奇怪。因为只要我将任何操作(JSON.parse、typeof 块)应用到转换中的块,数据就不会直接流向标准输出,但正如它所显示的那样,只有当读取流完成时。
有人知道为什么会这样吗?
我的管道:
const filePath = path.join(__dirname, 'file1.csv');
//From NPM module: csv-streamify
const csvTransformer = csv({objectMode: false, columns: true});
const mapper = new CsvMapper();
const productMapStream = ProductMapStream.create(mapper);
const writeStream = ProductWriteStream.create(this.connectionPool);
fs.createReadStream(filePath)
.pipe(csvTransformer)
.pipe(csvProductMapStream)
.pipe(process.stdout)
.on('finish', () => resolve('Ok'))
我的自定义转换流:
export class ProductMapStream {
static create(mapper: ProductMappable) {
return new Transform({
transform(chunk: any, enc, callback) {
try {
const chunkAsString = chunk.toString('utf8');
// This already prevents continuous flowing
const newString = '' + (typeof chunkAsString);
this.push(newString);
callback(null);
} catch (e) {
callback(e);
}
}
}).once('error', console.log)
}
}
编辑:
根据@PatrickDillon 的评论进行了一些实验后,我发现这个问题仅在它在 Docker 容器中运行时才会出现。我尝试了基于 Docker 节点映像的不同节点版本。我从 node:8.12.0-alpine 开始,我也尝试过 node:10.11.0-jessie 但不幸的是行为没有区别。有人知道将 Docker 与 fs 或流或任何可能相关的东西一起使用时的特殊行为吗?