1

所以我的代码应该从 CSV 文件中读取一些行,将它们转换为 JSON 对象数组,然后返回该数组。要将文件作为流读取,我使用got,然后在fast-csv. 为了返回结果数组,我将整个东西放入 Promise 中,如下所示:

async GetPage() : Promise<{OutputArray:any[], StartingIndex:number}>{
        return new Promise(async (resolve, reject) => {
            const output:any[] = []; 
            const startingIndex = this.currentLocation;
            try{
                parseStream(this.source, {headers:true, maxRows:this.maxArrayLength, skipRows:this.currentLocation, ignoreEmpty:true, delimiter:this.delimiter})
                    .on('error', error => console.log(`parseStream: ${error}`))
                    .on('data', row => {
                        const obj = this.unflatten(row); // data is flattened JSON, need to unflatten it
                        output.push(obj); // append to output array
                        this.currentLocation++;
                    })
                    .on('end', (rowCount: number) => {
                        console.log(`Parsed ${this.currentLocation} rows`);
                        resolve({OutputArray:output, StartingIndex:startingIndex});
                    });
            }
            catch(ex){
                console.log(`parseStream: ${ex}`);
                throw new Error(ex);
            }
        })
    }

现在,当我调用它一次 ( await GetPage()) 时,它工作得非常好。问题是当我连续第二次调用它时。我得到以下信息:

UnhandledPromiseRejectionWarning: Error: Failed to pipe. The response has been emitted already.

我在这里看到了一个类似的案例:https://github.com/sindresorhus/file-type/issues/342但据我所知,这是一个不同的案例,或者更确切地说,如果它是相同的,我不知道如何在此处应用解决方案。

这是在构造函数中给定的GetPage类中的一个方法,我像这样创建 Readable:CSVStreamParserReadablereadable:Readable = got.stream(url)

让我困惑的是,我的第一个版本的 GetPage 没有包含 Promise,而是接受了一个回调(我只是发送console.log来测试它),当我连续调用它几次时没有错误,但它无法返回值,所以我将其转换为 Promise。

谢谢!:)

编辑:我已经设法通过在开始时重新打开流来使其工作GetPage(),但我想知道是否有一种方法可以在不必这样做的情况下达到相同的结果?有没有办法保持流打开?

4

1 回答 1

0

首先,删除两个async, 因为您已经返回了Promise.

然后删除try/catch块,throw因为你不应该做出承诺。而是使用该reject功能。

GetPage() : Promise<{OutputArray:any[], StartingIndex:number}>{
    return new Promise((resolve, reject) => {
        const output:any[] = []; 
        const startingIndex = this.currentLocation;
        parseStream(this.source, {headers:true, maxRows:this.maxArrayLength, skipRows:this.currentLocation, ignoreEmpty:true, delimiter:this.delimiter})
            .on('error', error => reject(error))
            .on('data', row => {
                const obj = this.unflatten(row); // data is flattened JSON, need to unflatten it
                output.push(obj); // append to output array
                this.currentLocation++;
            })
            .on('end', (rowCount: number) => {
                console.log(`Parsed ${this.currentLocation} rows`);
                resolve({OutputArray:output, StartingIndex:startingIndex});
            });
    });
}

这里有一些资源可以帮助您了解异步函数承诺

于 2021-11-25T16:13:10.960 回答