我在 RxJS 中有一个特殊的生产者消费者问题:生产者缓慢地生产元素。消费者正在请求元素,并且通常必须等待生产者。这可以通过压缩生产者和请求流来实现:
var produce = getProduceStream();
var request = getRequestStream();
var consume = Rx.Observable.zipArray(produce, request).pluck(0);
有时请求会中止。生成的元素只应在未中止的请求后使用:
produce: -------------p1-------------------------p2--------->
request: --r1--------------r2---------------r3-------------->
abort: ------a(r1)------------------a(?)------------------>
consume: ------------------c(p1, r2)-------------c(p2, r3)-->
第一个请求r1
将消耗第一个生成的元素p1
,但在它可以消耗之前r1
被中止。产生并在第二次请求时被消耗。第二次中止被忽略,因为之前没有发生过未回答的请求。第三个请求必须等待下一个生成的元素,并且在生成之前不会中止。因此,它在生产后立即被消耗。a(r1)
p1
p1
c(p1, r2)
r2
a(?)
r3
p2
p2
p2
c(p2, r3)
如何在 RxJS 中实现这一点?
编辑:
我在 jsbin 上创建了一个带有 QUnit 测试的示例。您可以编辑该功能createConsume(produce, request, abort)
以尝试/测试您的解决方案。
该示例包含先前接受的 answer的函数定义。