0

我正在 TDDing RxJS 解决方案并使用bufferWithTime为什么示例代码中的res得到值[](空数组)?这是我的代码或 RxJS 库中的问题吗?在带有 rx 版本 2.2.27 的 node.js v0.10.30 上运行它。

以下可以在 nodejs coffeescript REPL 中运行

Rx = require 'rx'
onNext = Rx.ReactiveTest.onNext
TEST_EVENT_A = { messageName: 'test_event_a', namespace: 'test' }
events = [onNext(50, TEST_EVENT_A), onNext(100, TEST_EVENT_A)]
scheduler = new Rx.TestScheduler
stream = scheduler.createHotObservable events
excludeEmpty = (event) -> console.log "Filtering...", event; event.length > 0
countValues = (event) -> console.log "Counting...", event; event.length
res = scheduler.startWithTiming((=> stream.bufferWithTime(10).filter(excludeEmpty).map(countValues)), 0, 0, 1000).messages
# => []
4

1 回答 1

0

在调试和阅读文档后,我注意到文档中的以下语句bufferedWithTime[scheduler=Rx.Scheduler.timeout] (Scheduler): Scheduler to run buffer timers on。如果未指定,则使用超时调度程序。

我曾想过,当从 TestScheduler 创建 observable 时,它​​将使用该调度程序的所有方法,但调度程序必须显式传递。所以下面是正确的解决方案:

print = (event) -> console.log "Event: ", event
Rx = require 'rx'
onNext = Rx.ReactiveTest.onNext
TEST_EVENT_A = { messageName: 'test_event_a', namespace: 'test' }
events = [onNext(50, TEST_EVENT_A), onNext(100, TEST_EVENT_A)]
scheduler = new Rx.TestScheduler
stream = scheduler.createHotObservable events
excludeEmpty = (event) -> console.log "Filtering...", event; event.length > 0
countValues = (event) -> console.log "Counting...", event; event.length
# Notice the 2nd argument to bufferWithTime
res = scheduler.startWithTiming((=> stream.bufferWithTime(100, scheduler).filter(excludeEmpty).map(countValues)), 0, 0, 1500).messages
# => res[0].value.value == 2
于 2014-08-11T17:25:15.610 回答