5

我正在使用 RxJs 来计算在特定时间窗口内到达的数据包数量。我的代码基本上是这样的:

var packetSubject = new Rx.Subject();
var packetsInWindow = [];

function startMonitoring() {
    var subscription = packetSubject
        .windowWithTime(1000)
        .select(function(window) {
            window.toArray().subscribe(function(elements) {
                packetsInWindow.push(elements.length);
            });
        })
        .subscribe();
}

function newPacket(packet) {
    packetSubject.onNext(packet);
}

如何使用 Rx TestScheduler 对该代码进行单元测试?我找不到任何适合测试主题的示例。

4

2 回答 2

4

看看这个例子:

   var x = 0,
       scheduler = new Rx.TestScheduler();

   var subject = new Rx.Subject();
   subject.throttle(100, scheduler).subscribe(function (value) {
       x = value;
   });

   scheduler.scheduleWithAbsolute(0, function () {
       subject.onNext(1);//trigger first event with value 1
   });
   scheduler.scheduleWithAbsolute(50, function () {
       expect(x).toEqual(0);//value hasn't been updated
   });
   scheduler.scheduleWithAbsolute(200, function () {
       expect(x).toEqual(1);//value update after throttle's windowDuration 
   });

   scheduler.start();

https://emmkong.wordpress.com/2015/03/18/how-to-unit-test-rxjs-throttle-with-rx-testscheduler/

于 2015-03-18T05:10:35.883 回答
0

使用 RxJs 4.x,您可以在 TestScheduler 上调用 advanceBy。TestScheduler 可以在安装过程中注入。我写了一篇关于它的博客文章。

这是示例。

    var throttleWindowDuration = 2 * 1000; /* 2 seconds */

    function throttleTest() {
      var unthrottledStream = new Rx.Subject();
      var source = unthrottledStream.throttle(throttleWindowDuration);
      var result = {
        emitCounter: 0,
        unthrottledStream
      };

      var subscription = source.subscribe(
        function() {
          result.emitCounter++;
        });

      return result;
    }

    describe('run with test scheduler', function() {
      var testScheduler;
      var throttleSpy;

      beforeAll(function() {
        testScheduler = new Rx.TestScheduler();
        var originalThrottle = Rx.Observable.prototype.throttle;
        throttleSpy = spyOn(Rx.Observable.prototype, 'throttle')
          .and.callFake(function(dueTime) {
            return originalThrottle.call(this, dueTime, testScheduler);
          });
      });

      afterAll(function() {
        throttleSpy.and.callThrough();
      });

      it('advancing testScheduler allows to test throttling synchronously', function() {
        var throttleTestResult = throttleTest();

        //throttled stream will not emit if scheduler clock is at zero
        testScheduler.advanceBy(1);
        throttleTestResult.unthrottledStream.onNext();
        throttleTestResult.unthrottledStream.onNext();
        throttleTestResult.unthrottledStream.onNext();

        testScheduler.advanceBy(throttleWindowDuration);
        throttleTestResult.unthrottledStream.onNext();
        throttleTestResult.unthrottledStream.onNext();

        testScheduler.advanceBy(throttleWindowDuration);
        throttleTestResult.unthrottledStream.onNext();

        testScheduler.advanceBy(throttleWindowDuration);
        throttleTestResult.unthrottledStream.onNext();

        expect(throttleTestResult.emitCounter).toBe(4);
      });
    });

    describe('run without test scheduler', function() {
      it('without test scheduler the emit counter will stay at 1 ' 
        + 'as throttle duration is not elapsed', function() {
        var throttleTestResult = throttleTest();

        throttleTestResult.unthrottledStream.onNext();
        throttleTestResult.unthrottledStream.onNext();
        throttleTestResult.unthrottledStream.onNext();
        throttleTestResult.unthrottledStream.onNext();
        expect(throttleTestResult.emitCounter).toBe(1);
      });
    });

使用 RxJs 5,一切都变得简单多了。您可以像测试其他 JavaScript 基于时间的操作一样简单地使用 jasmine.clock。我已经在我的博客文章中展示了解决方案

于 2017-01-27T03:58:53.130 回答