我正在实现一个批量获取和处理请求的查询引擎。我正在使用异步/等待。
现在执行流程在一个层次结构中运行,其中有一个包含查询的项目列表,每个查询都有一个获取。
我想要做的是将项目捆绑在 n 组中,所以即使每个项目都有 m 个查询,里面有 fetches,也只有 n*m 个请求同时运行;特别是只有一个请求将同时向同一个域发出。
问题是,当我等待项目的执行时(在外部级别,在一段时间内将项目分组并停止迭代,直到承诺解决),当内部查询的执行被推迟时,这些承诺正在解决,因为获取的内部等待。
这导致我的排队只是暂时停止,而不是等待内部承诺解决。
这是外部的排队类:
class AsyncItemQueue {
constructor(items, concurrency) {
this.items = items;
this.concurrency = concurrency;
}
run = async () => {
let itemPromises = [];
const bundles = Math.ceil(this.items.length / this.concurrency);
let currentBundle = 0;
while (currentBundle < bundles) {
console.log(`<--------- FETCHING ITEM BUNDLE ${currentBundle} OF ${bundles} --------->`);
const lowerRange = currentBundle * this.concurrency;
const upperRange = (currentBundle + 1) * this.concurrency;
itemPromises.push(
this.items.slice(lowerRange, upperRange).map(item => item.run())
);
await Promise.all(itemPromises);
currentBundle++;
}
};
}
export default AsyncItemQueue;
这是队列正在运行的简单项目类。我省略了多余的代码。
class Item {
// ...
run = async () => {
console.log('Item RUN', this, this.name);
return await Promise.all(this.queries.map(query => {
const itemPromise = query.run(this.name);
return itemPromise;
}));
}
}
这是项目中包含的查询。每个项目都有一个查询列表。同样,一些代码被删除,因为它并不有趣。
class Query {
// ...
run = async (item) => {
// Step 1: If requisites, await.
if (this.requires) {
await this.savedData[this.requires];
}
// Step 2: Resolve URL.
this.resolveUrl(item);
// Step 3: If provides, create promise in savedData.
const fetchPromise = this.fetch();
if (this.saveData) {
this.saveData.forEach(sd => (this.savedData[sd] = fetchPromise));
}
// Step 4: Fetch.
const document = await fetchPromise;
// ...
}
}
while inAsyncItemQueue
正确停止,但仅在执行流程到达第 3 步之前Query
。一旦它到达那个 fetch,它是标准 fetch 函数的包装器,外部 promise 就会解析,并且我最终会同时执行所有请求。
我怀疑问题出在 Query 类的某个地方,但我对如何避免外部承诺的解决感到困惑。
我尝试让Query
类run
函数返回文档,以防万一,但无济于事。
任何想法或指导将不胜感激。我将尝试回答有关代码的任何问题或在需要时提供更多信息。
谢谢!
PS:这是一个带有工作示例的代码框:https ://codesandbox.io/s/goofy-tesla-iwzem
正如您在控制台出口中看到的那样,while 循环在提取完成之前进行迭代,并且它们都在同时执行。