我有一个包含一个数组的流,其中每个元素都有一个 id。我需要将其拆分为每个 id 的流,当源流不再携带 id 时,该流将完成。
例如具有这三个值的输入流序列
[{a:1}, {b:1}] [{a:2}, {b:2}, {c:1}] [{b:3}, {c:2}]
应该返回三个流
a -> 1 2 |
b -> 1 2 3
c -> 1 2
其中 a 在第 3 个值完成,因为它的 id 已经消失,而 c 在第 2 个值创建,因为它的 id 已经出现。
我正在尝试 groupByUntil,有点像
var input = foo.share();
var output = input.selectMany(function (s) {
return rx.Observable.fromArray(s);
}).groupByUntil(
function (s) { return s.keys()[0]; },
null,
function (g) { return input.filter(
function (s) { return !findkey(s, g.key); }
); }
)
因此,按 id 分组,并在输入流不再具有 id 时处理组。这似乎可行,但输入的两种用途对我来说看起来很奇怪,就像使用单个流来控制 groupByUntil 的输入和组的处置时可能存在奇怪的顺序依赖性。
有没有更好的办法?
更新
确实,这里有一个奇怪的时间问题。fromArray 默认使用 currentThread 调度程序,这将导致来自该数组的事件与来自输入的事件交错。然后在错误的时间(在处理来自先前输入的组之前)评估组上的处置条件。
一种可能的解决方法是使用 fromArray(.., rx.Scheduler.immediate),这将使分组事件与输入保持同步。