6

我在 RxJS 中有一个特殊的生产者消费者问题:生产者缓慢地生产元素。消费者正在请求元素,并且通常必须等待生产者。这可以通过压缩生产者和请求流来实现:

var produce = getProduceStream();
var request = getRequestStream();

var consume = Rx.Observable.zipArray(produce, request).pluck(0);

有时请求会中止。生成的元素只应在未中止的请求后使用:

produce:  -------------p1-------------------------p2--------->
request:  --r1--------------r2---------------r3-------------->
abort:    ------a(r1)------------------a(?)------------------>
consume:  ------------------c(p1, r2)-------------c(p2, r3)-->

第一个请求r1将消耗第一个生成的元素p1,但在它可以消耗之前r1被中止。产生并在第二次请求时被消耗。第二次中止被忽略,因为之前没有发生过未回答的请求。第三个请求必须等待下一个生成的元素,并且在生成之前不会中止。因此,它在生产后立即被消耗。a(r1)p1p1c(p1, r2)r2a(?)r3p2p2p2c(p2, r3)

如何在 RxJS 中实现这一点?

编辑: 我在 jsbin 上创建了一个带有 QUnit 测试的示例您可以编辑该功能createConsume(produce, request, abort)以尝试/测试您的解决方案。

该示例包含先前接受的 answer的函数定义。

4

3 回答 3

3

这个(核心思想减去细节)通过了你的 JSBin 测试:

var consume = request
  .zip(abort.merge(produce), (r,x) => [r,x])
  .filter(([r,x]) => isNotAbort(x))
  .map(([r,p]) => p);

JSBin 代码

于 2015-02-27T02:20:43.943 回答
2

我无法完全思考如何使用现有的操作员来做这件事。以下是如何做到这一点Observable.create()

return Rx.Observable.create(function (observer) {
  var rsub = new Rx.SingleAssignmentDisposable();
  var asub = new Rx.SingleAssignmentDisposable();
  var psub = new Rx.SingleAssignmentDisposable();
  var sub = new Rx.CompositeDisposable(rsub, asub, psub);
  var rq = [];
  var pq = [];
  var completeCount = 0;
  var complete = function () {
    if (++completeCount === 2) {
      observer.onCompleted();
    }
  };
  var consume = function () {
    if (pq.length && rq.length) {
      var p = pq.shift();
      var r = rq.shift();
      observer.onNext('p' + p);
    }
  };

  rsub.setDisposable(request.subscribe(
    function (r) {
      rq.push(r);
      consume();
    },
    function (e) { observer.onError(e); },
    complete));

  asub.setDisposable(abort.subscribe(
    function (a) {
      rq.shift();
    },
    function (e) { observer.onError(e); }
  ));

  psub.setDisposable(produce.subscribe(
    function (p) {
      pq.push(p);
      consume();
    },
    function (e) { observer.onError(e); },
    complete));


  return sub;
});

http://jsbin.com/zurepesijo/1/

于 2015-02-27T17:01:11.617 回答
0

此解决方案忽略不遵循未响应请求的中止:

const {merge} = Rx.Observable;

Rx.Observable.prototype.wrapValue = function(wrapper) {
    wrapper = (wrapper || {});
    return this.map(function (value) {
        wrapper.value = value;
        return wrapper;
    });
};

function createConsume(produce, request, abort) {
  return merge(
            produce.wrapValue({type: 'produce'}),
            request.wrapValue({type: 'request'}),
            abort.wrapValue({type: 'abort'})
         )
         .scan(
            [false, []],
            ([isRequest, products], e) => {
                // if last time the request was answered
                if (isRequest && products.length) {
                    // remove consumed product
                    products.shift();
                    // mark request as answered
                    isRequest = false;
                }
                if (e.type === 'produce') {
                    // save product to consume later
                    products.push(e.value);
                } else {
                    // if evaluated to false, e.type === 'abort'
                    isRequest = (e.type === 'request');
                }
                return [isRequest, products];
            }
         )
         .filter( ([isRequest, products]) => (isRequest && products.length) )
         .map( ([isRequest, products]) => products[0] ); // consume
}

JSBin 最新测试中的代码

于 2015-03-12T09:54:27.577 回答