我在后端使用 rxjs 和 NodeJS。我有一个允许消费者运行远程纱线安装过程的 Rest API。install 函数返回一个可观察的进程。因此,当模块安装成功时,它会在 observable 和 complete 中发出一个值。此时,Rest API 会返回一个响应给用户说安装成功。如果安装失败,该过程将在流中抛出一个错误,并且 Rest API 返回另一个带有错误信息的响应。
我的问题是:
该 API 被消费者并行调用多次,因此后端会有并行安装。
我尝试油门操作员创建一个队列,但它保持第一个流处于活动状态。所以如果第一个进程“完成”,它返回“真”但流没有完成
export class MyService {
// the function called by the REST API
installGlobal(moduleName: string): Observable < boolean > {
// I think, there are something to do here to make it queuing
return this.run('yarn', ['global', 'add', moduleName]);
}
private run(cmd: string, args: string[]): Observable < boolean > {
const cmd$ = fromPromise(spawn(cmd, args)).pipe(
map(stdout => {
this.logger.info(`Install Module Successfully`);
this.logger.info(`stdout: ${stdout.toString()}`);
return true;
}),
catchError(error => {
const errorMessage: string = error.stderr.toString();
return _throw(errorMessage.substr(errorMessage.indexOf(' ') + 1));
})
);
return cmd$;
}
}
我的期望:
要么有多个请求,要么必须排队。所以第一个将被处理,所有并行的一次都必须排队。当第一个被处理时,它必须将响应返回给 API 消费者(如 200 完成)并从队列中恢复下一个流。
[UPDATE-01 July 2019]:添加示例
您可以在stackblitz获取代码演示
我已经重新实现了现有代码,并且通过多次订阅将调用队列的服务来模拟我的 API 调用