8

如何对 Observable 进行分组,并从每个 GroupedObservable 仅将最后发出的项目保留在内存中?这样每个组的行为就像 BehaviorSubject 一样。

像这样的东西:

{user: 1, msg: "Anyone here?"}
{user: 2, msg: "Hi"}
{user: 2, msg: "How are you?"}
{user: 1, msg: "Hello"}
{user: 1, msg: "Good"}

所以在内存中,我们只有每个的最后一项user

{user: 2, msg: "How are you?"}
{user: 1, msg: "Good"}

当订阅者订阅时,这两个项目会立即发布(每个项目都有自己的发射)。就像我们每个 BehaviorSubject 一样user

onCompleted() 永远不会被触发,因为人们可能会永远聊天。

我事先不知道user可以有什么价值。

4

2 回答 2

3

我假设你的聊天记录很热。因此,#groupBy 发出的 groupObservables 也会很热,并且不会自行将任何内容保存在内存中。

要获得您想要的行为(丢弃除订阅前的最后一个值之外的所有内容并从那里继续),您可以使用 ReplaySubject(1)。

如果我错了请纠正我

jsbin

var groups = chatlog
      .groupBy(message => message.user)
      .map(groupObservable => {
        var subject = new Rx.ReplaySubject(1);
        groupObservable.subscribe(value => subject.onNext(value));
        return subject;
      });
于 2015-12-08T18:32:59.130 回答
2

你可以编写 reduce 函数,输出分组的 observables 的最新发出的项目,将其传递给scanobservable,并使用它shareReplay来调用为新订阅者发出的最后一个值。它会是这样的:

var fn_scan = function ( aMessages, message ) {
  // aMessages is the latest array of messages
  // this function will update aMessages to reflect the arrival of the new message
  var aUsers = aMessages.map(function ( x ) {return x.user;});
  var index = aUsers.indexOf(message.user);
  if (index > -1) {
    // remove previous message from that user...
    aMessages.splice(index, 1);
  }
  // ...and push the latest message
  aMessages.push(message);
  return aMessages;
};
var groupedLatestMessages$ = messages$
    .scan(fn_scan, [])
    .shareReplay(1);

因此,您在订阅时得到的是一个数组,其大小在任何时候都是发出消息的用户数,其内容将是用户按发出时间排序的消息。

只要有订阅,最新的数组就会立即传递给订阅者。那是一个数组,我想不出一种方法如何一个一个地传递值,同时满足您的规范。希望这对您的用例来说已经足够了。

更新:这里的 jsbin http://jsfiddle.net/zs7ydw6b/2

于 2015-12-08T01:15:45.227 回答