5

我正在尝试将平原string[]转换为 aObservable<string[]>并将其连接到现有的Observable<string[]>.

然后我将使用 angular2async管道来显示Observable.

这是我的代码:

import {Injectable} from "angular2/core";
import {Observable} from "rxjs/Observable";
import 'rxjs/Rx';


@Injectable()
export class AppService {

    constructor() {
        console.log('constructor', 'appService');
        this.constructSomeObservable();
    }

    someObservable$:Observable <string[]>;

    constructSomeObservable() {
        this.someObservable$ = Observable.create(observer => {
            const eventSource = new EventSource('/interval-sse-observable');
            eventSource.onmessage = x => observer.next(JSON.parse(x.data));
            eventSource.onerror = x => observer.error(console.log('EventSource failed'));
            return () => {
                eventSource.close();
            };
        });

        this.someObservable$.subscribe(
            theStrings=> {
                console.log(theStrings);
                //Somehow convert the plain array of strings to an observable and concat it to this.someObservable$ observable...
            },
            error=>console.log(error)
        );
    }
}

有人可以帮忙吗?

此外,我想确保在Observable<string[]>重复调用 EventSource 时不断更新服务实例。我的订阅逻辑是否在正确的位置?

编辑 1:我尝试使用 RxJSconcat运算符,如下所示:

    this.someObservable$.subscribe(
        theStrings=> {
            console.log(theStrings);
            this.someObservable$ = this.someObservable$.concat(Observable.create(theStrings));
        },
        error=>console.log(error)
    );
 }

与 angular2async管道一起:

<ul>
    <li *ngFor="#s of appService.someObservable$ | async">
       a string: {{ s }}
    </li>
</ul>

页面上什么也没有显示;字符串只是显示在控制台上...

我做错了什么?

编辑 2:该应用程序可在 github 上找到

编辑 3:我考虑了 Thierry 的建议,特别是使用async管道来订阅以及使用扫描运算符。

现在唯一剩下的问题是我需要单击路由器链接才能在模板上呈现字符串...模板不会自动更新...

请参阅 github 上的项目和相关标签:https ://github.com/balteo/demo-angular2-rxjs/tree/36864628/536299

4

1 回答 1

5

我会利用scan运营商来做到这一点。这是一个示例:

@Component({
  selector: 'app'
  template: `
    <div>
      <div *ngFor="#elt of someObservable$  | async">{{elt.name}</div>
    </div>
  `
})
export class App {
  constructor() {
    this.someObservable$ = Observable.create((observer) => {
      const eventSource = new EventSource('/interval-sse-observable');
      eventSource.onmessage = x => observer.next(JSON.parse(x.data));
      eventSource.onerror = x => observer.error(console.log('EventSource failed'));
      return () => {
        eventSource.close();
      };
    })
    .startWith([])
    .scan((acc,value) => acc.concat(value));
  }
}

这是相应的 plunkr:https ://plnkr.co/edit/St7LozX3bnOBcoHaG4uM?p=preview 。

有关更多详细信息,请参阅此问题:

于 2016-04-26T12:12:54.620 回答