Rx 确实非常适合这个问题 IMO。
IObservables
由于显而易见的原因(您必须首先观察整个流以保证正确的输出顺序),不能“OrderBy”,所以我在下面的回答假设(您所说的)您的 2 个源事件流是有序的。
最后这是一个有趣的问题。标准的 Rx 运算符缺少一个GroupByUntilChanged
可以轻松解决这个问题的方法,只要它OnComplete
在观察到下一组的第一个元素时调用前一组 observable 即可。但是,查看DistinctUntilChanged
它的实现并不遵循这种模式,并且仅OnComplete
在源 observable 完成时调用(即使它知道在第一个非不同元素之后将不再有元素......奇怪???)。无论如何,出于这些原因,我决定反对一种GroupByUntilChanged
方法(不违反 Rx 约定),而是选择了ToEnumerableUntilChanged
.
免责声明:这是我的第一个 Rx 扩展,因此希望对我的选择提供反馈。此外,我主要关心的一个问题是持有该distinctElements
列表的匿名 observable。
首先,您的应用程序代码非常简单:
public class Event
{
public DateTime Timestamp { get; set; }
}
private IObservable<Event> eventStream1;
private IObservable<Event> eventStream2;
public IObservable<IEnumerable<Event>> CombineAndGroup()
{
return eventStream1.CombineLatest(eventStream2, (e1, e2) => e1.Timestamp < e2.Timestamp ? e1 : e2)
.ToEnumerableUntilChanged(e => e.Timestamp);
}
现在ToEnumerableUntilChanged
执行(代码警告墙):
public static IObservable<IEnumerable<TSource>> ToEnumerableUntilChanged<TSource,TKey>(this IObservable<TSource> source, Func<TSource,TKey> keySelector)
{
// TODO: Follow Rx conventions and create a superset overload that takes the IComparer as a parameter
var comparer = EqualityComparer<TKey>.Default;
return Observable.Create<IEnumerable<TSource>>(observer =>
{
var currentKey = default(TKey);
var hasCurrentKey = false;
var distinctElements = new List<TSource>();
return source.Subscribe((value =>
{
TKey elementKey;
try
{
elementKey = keySelector(value);
}
catch (Exception ex)
{
observer.OnError(ex);
return;
}
if (!hasCurrentKey)
{
hasCurrentKey = true;
currentKey = elementKey;
distinctElements.Add(value);
return;
}
bool keysMatch;
try
{
keysMatch = comparer.Equals(currentKey, elementKey);
}
catch (Exception ex)
{
observer.OnError(ex);
return;
}
if (keysMatch)
{
distinctElements.Add(value);
return;
}
observer.OnNext( distinctElements);
distinctElements.Clear();
distinctElements.Add(value);
currentKey = elementKey;
}), observer.OnError, () =>
{
if (distinctElements.Count > 0)
observer.OnNext(distinctElements);
observer.OnCompleted();
});
});
}