2

在我的应用程序中,我有一个服务,它返回一个像这样的 observable:

public genericService(params) {
    //Do some stuff
    //...

    return this.http.post('http://foo.com', params)
        .map((response) => {
            //Do some generic stuff
            //...

            return someData;
        })
        .catch((error: any) => {
            //Manage error in a generic way + do some generic stuff
            //...

            return Observable.throw(error);
        });
}

let debouncePointer = debounceObservable(genericService, 200);

public genericServiceDebounce(params) {
    return debouncePointer(params);
}

现在在另一个地方,我想这样调用我的函数

genericServiceDebounce(params)
    .subscribe((response) => {
        //Do some non-generic stuff
    }, (error) => {
        //Manage error in a non-generic way + do some non-generic stuff
    });

但是我没有成功实现 debounceObservable() 函数。

我基于 Promise 等效项(https://github.com/moszeed/es6-promise-debounce/blob/master/src/es6-promise-debounce.js)尝试了这个实现:

debounceObservable(callback, delay, immediate?) {
    let timeout;
    return function () {
        let context = this, args = arguments;

        return Observable.create((observer) => {
            let later = function () {
                timeout = null;

                if(!immediate) {
                    observer.next(callback.apply(context, args));
                    //observer.onCompleted(); // don't know if this is needed
                }
            };
            let callNow = immediate && !timeout;
            clearTimeout(timeout);
            timeout = setTimeout(later, delay);

            if(callNow) {
                observer.next(callback.apply(context, args));
                //observer.onCompleted(); // don't know if this is needed
            }
        });
    }
}

但这并没有按预期工作。使用 Promises 时,返回 resolve(anotherPromise) 允许您调用:

genericServiceDebounce().then(response => {

})

当使用 Observables 时,返回 observable.next(anotherObservable) 会返回一个嵌入的 observable,这意味着你应该调用:

genericServiceDebounce().subscribe(obs => {
    obs.subscribe(response => {

    })
})

你将如何实现 debounceObservable() 函数?(以类似 Promise 的方式)

澄清 1:我找到了 Observable.debounce() 函数,但这会使观察者而不是可观察者本身去抖动。我想消除可观察的

澄清 2:我将 debounce 放在服务端,因为它是一个单例,并且它们是多个调用者。如果我把它放在调用方,每个调用方会有一个不同的去抖动计时器。

编辑:这是一个片段,我试图解释我的问题。只需单击不同的按钮即可查看不同的行为(更多解释见 js 代码注释)。

Observable.debounce展示了来自 RxJs 的 .debounce() 是如何工作的。它只输出“3”,但我想要“1”、“2”、“3”。

Observable.debounce x3显示了如果我调用代码 3 次而不将整个函数包装在去抖动中会发生什么。

Observable Wrapped x3显示了我想要获得的内容。我的整个函数都被包装了,但是如果你看代码,订阅部分很挑剔。

Promise x3展示了使用 Promises 是多么简单。

let log = (logValue) => {
    const list = document.querySelector('#logs');
    const li = document.createElement('li');
    li.innerHTML = logValue;
    list.appendChild(li);
}

/* ************************ */
/* WITH OBSERVABLE.DEBOUNCE */
/* ************************ */

let doStuffObservable = () => {
    Rx.Observable.create((observer) => {
        log('this should be called only one time (observable.debounce)');
        setTimeout(() => {
            observer.next('observable.debounce 1');
            observer.next('observable.debounce 2');
            observer.next('observable.debounce 3');
        }, 1000);
    })
        .debounce(500)
        .subscribe((response) => {
            log(response);
        }, (error) => {
            log(error);
        });
}

/* *********************************** */
/* WITH OBSERVABLE WRAPPED IN DEBOUNCE */
/* *********************************** */

let doStuffObservable2 = (param) => {
    return Rx.Observable.create((observer) => {
        log('this should be called only one time (observable wrapped)');
        setTimeout(() => {
            observer.next('observable wrapped ' + param);
        }, 1000);
    })
}

let debounceObservable = (callback, delay, immediate) => {
    let timeout;
    return function () {
        let context = this, args = arguments;

        return Rx.Observable.create((observer) => {
            let later = function () {
                timeout = null;

                if(!immediate) {
                    observer.next(callback.apply(context, args));
                }
            };
            let callNow = immediate && !timeout;
            clearTimeout(timeout);
            timeout = setTimeout(later, delay);

            if(callNow) {
                observer.next(callback.apply(context, args));
            }
        });
    }
}

let doStuffObservable2Debounced = debounceObservable(doStuffObservable2);

  
/* ************* */
/* WITH PROMISES */
/* ************* */

let doStuffPromise = (param) => {
    return new Promise((resolve, reject) => {
        log('this should be called only one time (promise)');
        setTimeout(() => {
            resolve('promise ' + param);
        }, 1000);
    });
}

let debouncePromise = (callback, delay, immediate) => {
    let timeout;
    return function () {
        let context = this, args = arguments;
        return new Promise(function (resolve) {
            let later = function () {
                timeout = null;
                
                if (!immediate) {
                    resolve(callback.apply(context, args));
                }
            };
            let callNow = immediate && !timeout;
            clearTimeout(timeout);
            timeout = setTimeout(later, delay);
            
            if (callNow) {
                resolve(callback.apply(context, args));
            }
        });
    }
}

  
/* ******* */
/* SAMPLES */
/* ******* */

function doObservableDebounce() {
  doStuffObservable();
  
  // result :
  
      // this should be called only one time (observable.debounce)
      // observable.debounce 3
  
  // this is not what i want, i want all three values in output
}

function doObservableDebounce3Times() {
  doStuffObservable();
  doStuffObservable();
  doStuffObservable();
  
  // result :
  
      // this should be called only one time (observable.debounce)
      // this should be called only one time (observable.debounce)
      // this should be called only one time (observable.debounce)
      // observable.debounce 3
      // observable.debounce 3
      // observable.debounce 3
  
  // this is bad
}

function doObservableWrappedDebounce3Times() {
  doStuffObservable2Debounced(1)
      .subscribe((response) => {
          log(response);
          response.subscribe((response2) => {
              log(response2);
          }, (error) => {
              log(error);
          })
      }, (error) => {
          log(error);
      });
  doStuffObservable2Debounced(2)
      .subscribe((response) => {
          log(response);
          response.subscribe((response2) => {
              log(response2);
          }, (error) => {
              log(error);
          })
      }, (error) => {
          log(error);
      });
  doStuffObservable2Debounced(3)
      .subscribe((response) => {
          log(response);
          response.subscribe((response2) => {
              log(response2);
          }, (error) => {
              log(error);
          })
      }, (error) => {
          log(error);
      });
  
  
  // result :
  
      // AnonymousObservable { source: undefined, __subscribe: [Function] }
      // this should be called only one time (observable wrapped)
      // observable wrapped 3
  
  // this is good but there are 2 embedded subscribe
}

function doPromiseDebounce3Times() {
  let doStuffPromiseDebounced = debouncePromise(doStuffPromise);
  
  doStuffPromiseDebounced(1).then(response => {
      log(response);
  })
  doStuffPromiseDebounced(2).then(response => {
      log(response);
  })
  doStuffPromiseDebounced(3).then(response => {
      log(response);
  })
  
  // result :
  
      // this should be called only one time (promise)
      // promise 3
  
  // this is perfect
}
<!DOCTYPE html>
<html>

  <head>
    <script data-require="rxjs@4.0.6" data-semver="4.0.6" src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/4.0.6/rx.all.js"></script>
  </head>

  <body>
    <button onclick='doObservableDebounce()'>Observable.debounce</button>
    <button onclick='doObservableDebounce3Times()'>Observable.debounce x3</button>
    <button onclick='doObservableWrappedDebounce3Times()'>Observable wrapped x3</button>
    <button onclick='doPromiseDebounce3Times()'>Promise x3</button>
    <ul id="logs"></ul>
  </body>

</html>

4

2 回答 2

2

抱歉,我没有收到您对我评论的回复的任何通知。

对于这个问题,一个更简洁的 Rx-only 解决方案是将您的服务调用视为事件流,如下所示:

constructor() {
    this._genericServiceCall$ = new ReplaySubject(1);
    this._genericServiceResult$ = this._genericServiceCall$
        .asObservable()
        .debounceTime(1000)
        .switchMap(params => this._genericService(params));
}

private _genericService(params) {
    //Do some stuff
    //...

    return this.http.post('http://foo.com', params)
        .map((response) => {
            //Do some generic stuff
            //...

            return someData;
        })
        .catch((error: any) => {
            //Manage error in a generic way + do some generic stuff
            //...

            return Observable.throw(error);
        });
}

public genericService(params) {
    this._genericServiceCall$.next(params);
    return this._genericServiceResult$; // Optionally add `.take(1)` so the observer has the expected behaviour of only getting 1 answer back
}

不过,我在其中看到了一些东西……params您会接受哪些作为必须通过私人的_genericService

无论如何,你关注这里发生的事情吗?因此,每次有人调用genericService()它时,它都不会立即调用该服务 - 相反,它会发出一个新的_genericServiceCall$并返回_genericServiceResult$流。如果我们看一下这个流是如何定义的,我们会看到它需要一个去抖动_genericServiceCall$,然后将它映射到服务调用。理论上它应该工作 - 没有尝试过。

编辑:现在我明白了 - 您可能需要发布 genericServiceResult 以使其成为热可观察对象,否则它会在任何观察者订阅它时立即返回:

constructor() {
    this._genericServiceCall$ = new ReplaySubject(1);
    this._genericServiceResult$ = this._genericServiceCall$
        .asObservable()
        .debounceTime(1000)
        .switchMap(params => this._genericService(params))
        .publish();
    const subscription = this._genericServiceResult$.connect();
    // You must store subscription somewhere and dispose it when this object is destroyed - If it's a singleton service this might not be needed.
}
于 2017-12-01T08:23:06.953 回答
0

好吧,我想我找到了办法。我应该做的是更换:

observer.next(callback.apply(context, args));

经过

callback.apply(context, args).subscribe((response) => {
        observer.next(response)
    }, (error) => {
        observer.error(error);
    });

最后,这可以像经典的 observable 一样使用:

debouncedObservable(1)
    .subscribe((response) => {
        log(response);
    }, (error) => {
        log(error);
    });

这是一个实现的片段:

let log = (logValue) => {
    const list = document.querySelector('#logs');
    const li = document.createElement('li');
    li.innerHTML = logValue;
    list.appendChild(li);
}

/* *********************************** */
/* WITH OBSERVABLE WRAPPED IN DEBOUNCE */
/* *********************************** */

let doStuffObservable = (param) => {
    return Rx.Observable.create((observer) => {
        log('this should be called only one time (observable wrapped)');
        setTimeout(() => {
            observer.next('observable wrapped ' + param);
        }, 1000);
    })
}

let debounceObservable = (callback, delay, immediate) => {
    let timeout;
    return function () {
        let context = this, args = arguments;

        return Rx.Observable.create((observer) => {
            let later = function () {
                timeout = null;

                if(!immediate) {
                    callback.apply(context, args).subscribe((response) => {
                          observer.next(response)
                      }, (error) => {
                          observer.error(error);
                      });
                }
            };
            let callNow = immediate && !timeout;
            clearTimeout(timeout);
            timeout = setTimeout(later, delay);

            if(callNow) {
                callback.apply(context, args).subscribe((response) => {
                          observer.next(response)
                      }, (error) => {
                          observer.error(error);
                      });
            }
        });
    }
}

let doStuffObservable2Debounced = debounceObservable(doStuffObservable);

/* ******* */
/* SAMPLES */
/* ******* */

function doObservableWrappedDebounce3Times() {
  doStuffObservable2Debounced(1)
      .subscribe((response) => {
          log(response);
      }, (error) => {
          log(error);
      });
      
  doStuffObservable2Debounced(2)
      .subscribe((response) => {
          log(response);
      }, (error) => {
          log(error);
      });
      
  doStuffObservable2Debounced(3)
      .subscribe((response) => {
          log(response);
      }, (error) => {
          log(error);
      });
}
<!DOCTYPE html>
<html>

  <head>
    <script data-require="rxjs@4.0.6" data-semver="4.0.6" src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/4.0.6/rx.all.js"></script>
  </head>

  <body>
    <button onclick='doObservableWrappedDebounce3Times()'>Observable wrapped x3</button>
    <ul id="logs"></ul>
  </body>

</html>

如果您认为我遗漏了什么,请发表评论。

于 2017-11-30T19:18:32.850 回答