5

我正在尝试使用 RxJS 创建一个可观察的对象,它可以执行如图所示的操作。

预期的可观察映射

  • 获取一个值并在获取下一个值之前等待一段固定的时间。
  • 下一个将是等待期间发出的最后一个值,跳过其余的。
  • 如果在没有发出任何值的情况下经过等待间隔,则应立即抓取下一个,如图像的最后一个示例所示。
4

2 回答 2

5

这应该可以解决问题。

var Rx      = require('rx'),
    source  = Rx.Observable.interval(10).take(100),
    log     = console.log.bind(console);

Rx.Observable.create(function (observer) {

    var delaying = false,
        hasValue = false,
        complete = false,
        value;

    function onNext (x) {
      value = x;
      if (delaying) {
        hasValue = true;
      } else {
        sendValue();
      }
    }

    function sendValue () {
      observer.onNext(value);
      if (complete) {
        observer.onCompleted();
      } else {
        setTimeout(callback, 1000); // exercise for the reader. Use a scheduler.
      }
      delaying = true;
    }

    function callback () {
      if (hasValue) {
        hasValue = false;
        sendValue();
      } else {
        delaying = false;
      }
    }

    return source.subscribe(
        onNext,
        observer.onError.bind(observer),
        function () {
          if (hasValue) {
            complete = true;
          } else {
            observer.onCompleted();
          }
        }
      );
  })
  .subscribe(log);
于 2014-05-24T04:57:41.370 回答
1

这是克里斯托弗的解决方案修改为运算符。

运算符仅存储来自源的throttleImmediate最新值,直到给定的选择器完成。它会在每次完成后立即触发缓存值(如果存在)。它最适合在选择器有副作用(例如动画)时使用。

var Rx  = require('rx'),
source  = Rx.Observable.interval(10).take(500),
log     = console.log.bind(console);

Rx.Observable.prototype.throttleImmediate = function (selector) {
    var source = this;

    return Rx.Observable.create(function (observer) {

        var delaying = false,
            hasValue = false,
            complete = false,
            value;

        function onNext (x) {
          value = x;
          if (delaying) {
            hasValue = true;
          } else {
            sendValue();
          }
        }

        function sendValue () {
          delaying = true;
          selector(value).subscribe(
            observer.onNext.bind(observer),
            observer.onError.bind(observer),
            function(){
              if (hasValue) {
                hasValue = false;
                sendValue();
              } else {
                delaying = false;
              }
            }
          );
        }

        return source.subscribe(
            onNext,
            observer.onError.bind(observer),
            function () {
              if (hasValue) {
                complete = true;
              } else {
                observer.onCompleted();
              }
            }
          );
      });
};

source
  .throttleImmediate(function(data){
    var delay;

    if(data%2==0)
      delay=500;
    else
      delay=1000;

    return Rx.Observable.timer(delay).map(function(){ return data; });
  })
  .subscribe(log)

这在对延迟值只有选择器知道的源进行反压时派上用场。

示例:给定问题的大理石图。

让我们假设第一个来源是带有要显示的 html 数据的 ajax 调用,ajaxPages它源自对导航栏的点击。我们希望将它们与入口动画一起渲染animatePage,其持续时间是动态的。

ajaxPages.throttleImmediate(animatePage).subscribe();

在这里,我们使用源中的值对页面进行动画处理,跳过动画期间发出的所有值,除了最新的值。

在实践中,我们得到的是一个流,它忽略了紧随其后的其他点击,并且对于向用户显示毫无用处,因为它们会动画化,然后立即动画化。

于 2014-05-26T13:25:31.307 回答