0
var source = Observable.fromEvent(document.body, 'keypress');


var obs = source

  .bufferTime(1000)

  .map((clickBuffer) => {

    console.log(clickBuffer)

    return clickBuffer.length;
  })

  .take(3)

;

// when using this function - error after 3rd event
obs.subscribe((num) => {

});

我正在 angular 2 beta 和 RxJS 5 上尝试这个。它可以毫无问题地获得 3 个事件并计算每秒的按键次数。为什么我会收到错误消息?我希望它在 3 秒后停止。

这是这个问题的延续: Counting keypresses per second with angular 2 Rxjs

更新

找到了如何不抛出的情况,以不同的方式定义源:

// there are functions in typescript class
// onKeypressReactBuffer is called from angular template on input field keypress
class K {

    onKeypressReactBuffer() {}


    reactiveWay() {

    const source = Observable.create(observer => {

        // next(1) - we will use this value to add
        this.onKeypressReactBuffer = () => { 
            observer.next(1);      


            // this make it not throw the error. 
            observer.complete('ha') };
        });

    }
}

所以仍然存在问题 - 为什么它不适用于函数 fromEvent() ?我也看不到如何在此函数上定义 completed() 方法。所以它应该是一些默认值。

而且 - 我怎样才能从错误消息给我的信息中更快地找到 Observable.create 我需要 .complete() ?直到最近经过数小时的思考后,我才开始尝试——也许它需要完成()函数。

更新

实际上我上次更新的代码不能正常工作。它只是停止给我错误。

上次更新会发生什么 - 如果不按键,它会发出 0 次和 3 次事件。一旦按下 - 它会发出总和 1 的事件并停止发出。

更新:

如何在 web pack starter 上重现:

安装 angular 2 webpack starter 版本 5.0.3(版本 3 也出现错误)。

顺便说一句,我不得不将 package.json 从 Rxjs beta 4 更改为 beta 2,因为否则无法安装。

并在 home.comoponent.ts ngOnInit() 函数中将代码与https://jsbin.com/fafuldayi/edit?js,console,output几乎相同

唯一的区别是使用 Observable 并导入 Observable 而不是 Rx.Observable - 请参阅代码:

import {Component} from 'angular2/core';
import {AppState} from '../app.service';

import {Title} from './title';
import {XLarge} from './x-large';

import { Observable, Subject } from 'rxjs/Rx';

@Component({
  // The selector is what angular internally uses
  // for `document.querySelectorAll(selector)` in our index.html
  // where, in this case, selector is the string 'home'
  selector: 'home',  // <home></home>
  // We need to tell Angular's Dependency Injection which providers are in our app.
  providers: [
    Title
  ],
  // We need to tell Angular's compiler which directives are in our template.
  // Doing so will allow Angular to attach our behavior to an element
  directives: [
    XLarge
  ],
  // We need to tell Angular's compiler which custom pipes are in our template.
  pipes: [ ],
  // Our list of styles in our component. We may add more to compose many styles together
  styles: [ require('./home.css') ],
  // Every Angular template is first compiled by the browser before Angular runs it's compiler
  template: require('./home.html')
})
export class Home {
  // Set our default values
  localState = { value: '' };
  // TypeScript public modifiers
  constructor(public appState: AppState, public title: Title) {

  }

  ngOnInit() {
    console.log('hello `Home` component');
    // this.title.getData().subscribe(data => this.data = data);


    var source = Observable.fromEvent(document.body, 'keypress');


    var obs = source

        .bufferTime(1000)

        .map((clickBuffer) => {

          //console.log(clickBuffer.length)

          return clickBuffer.length;
        })

        .take(5)

        ;

    // when using this function - error after 3rd event
    obs.subscribe((num) => {

      console.log(' home ' + num)

    });


  }

  submitState(value) {
    console.log('submitState', value);
    this.appState.set('value', value);
  }

}

Thierry Templier 示例有效,但我也想了解为什么 Birowsky 示例无法与此入门包一起使用。

顺便说一句,我还看到 Birowsky 使用旧版本 - rxjs@5.0.0-alpha.8 。我试图在这个 url 中包含 beta.2,但后来我得到错误 Rx not found: https://npmcdn.com/@reactivex/rxjs@5.0.0-beta.2/dist/global/Rx.js in jsbin .com

4

1 回答 1

1

我认为您可以使用专门的主题来提供takeUntil操作员来停止数据流。像这样的东西:

var source = Observable.fromEvent(document.body, 'keyup');

var stop = new Subject();
Observable.interval(3100).subscribe(() => stop.next());

var obs = source.bufferTime(1000).map(clickBuffer => {
  console.log(clickBuffer);
  return clickBuffer.length;
}).takeUntil(stop);

obs.subscribe((num) => {
  console.log('num = '+num);
});
于 2016-04-05T08:54:36.763 回答