我希望有人能够指出我正确的方向,因为我正在努力结合并发和取消rxjs 中排队请求的能力。我将尝试在连续事件中解释这一点。假设我们有 observable A,它接收一个字符串数组。
事件:A 观察到:['dog', 'cat', 'elephant', 'tiger'] 下游检查字符串网络响应是否被缓存,如果它存在于缓存中则从缓存中获取它,如果没有则从缓存中请求它web 并使用 publishReplay / shareReplay 将 observable 保存到缓存中。一次发生 2 个网络请求的限制,因此它尝试从 api 获取“狗”和“猫”(此操作需要 2000 毫秒以上)。1000 毫秒后,A 观察到另一组值:['dog', 'rat', 'horse', 'rabbit']。
接下来应该发生以下情况。我不希望取消“狗”和“猫”请求,我希望他们完成他们的请求,但我想忽略第一个请求中的“大象”和“老虎”。一旦 'dog' 和 'cat' 从第二帧得到他们的响应'rat' 和 'horse' 应该从网络请求,最后一旦其中任何一个解析 'rabbit' 被请求。
这是我当前的代码。我尝试在 networkObservable 的 defer 和 from 之间切换,但行为不同,这也不是我想要的。
const cache = new Map();
// Fake Promise to fake a api request
function later(delay, value) {
console.log('requesting', value);
return new Promise(resolve => setTimeout(resolve, delay, value));
}
const testObservable = of(['dog', 'rat', 'horse', 'rabbit']).pipe(
delay(1000),
startWith(['dog', 'cat', 'elephant', 'tiger'])
);
testObservable.pipe(
map(array => from(array).pipe(
publish(arrayObservable => {
const cachedObservable = arrayObservable.pipe(
filter(id => cache.has(id)),
flatMap(id => cache.get(id), 1)
);
const uncachedObservable = arrayObservable.pipe(
filter(id => !cache.has(id)),
flatMap(id => {
const networkObservable = from(later(2000, id)).pipe(
tap(e => console.log('response', e)),
map(e => 'parsed: ' + e),
tap(e => console.log('parsed', e)),
publishReplay(1),
refCount(),
take(1)
);
cache.set(id, networkObservable);
return networkObservable;
}, 2)
);
return merge(cachedObservable, uncachedObservable);
})
)),
switchAll()
)
这导致输出:
requesting dog
requesting cat
requesting rat
requesting horse
response dog
parsed parsed: dog
response rat
parsed parsed: rat
requesting rabbit
response horse
parsed parsed: horse
response rabbit
parsed parsed: rabbit
这接近想要的行为,但有一个明显的缺陷。Rat and horse 被请求并且不等待 dog 和 cat 在被处决之前解决。但是,“老虎”和“大象”已被妥善处理,因此功能正常。
我是否必须创建一个单独的主题来处理请求?