-1

应用程序(Saver)通过 websocket 从远程服务器接收实时数据并将其存储在数据库中。它向客户端公开一个 REST 端点,该端点返回迄今为止存储在数据库中的所有数据。

客户端应用程序在启动时订阅远程服务器的 websocket 上的实时数据。然后它向 Saver 的 REST 端点发出请求,并接收到目前为止的所有数据。

IObservable<AType>两个数据源都像在客户端应用程序中一样公开。

AType包括 Timestamp 属性。

如何将这两个 Observable 组合起来,使它们是连续的(按时间戳)而没有重复?

更新:在任一数据源/Observables 中都不可能出现重复,但在它们组合时是可能的,因为在调用 REST 端点之前订阅了 websocket。订阅它们是为了避免数据丢失。

4

2 回答 2

2

不可能在没有缓冲/窗口的情况下按值对实时流进行排序。您需要使用/运算符显式缓冲合并的流 ( Observable.Merge) ,然后您可以使用它按时间戳排序。Observable.BufferObservable.WindowEnumerable.SortBy

这是 rxnet github 页面中的相关问题:https ://github.com/Reactive-Extensions/Rx.NET/issues/122 。

有一个Observable.Distinct运算符,但在长时间运行/高吞吐量流上使用它时要小心,因为它存储值的散列以检测重复项。

于 2018-04-11T20:56:22.747 回答
0

我的工作解决方案:

订阅 websocket

通过 REST 检索到目前为止的所有数据

如果有任何东西通过 websocket 到达,请获取第一项的时间戳,然后从 REST 结果中删除它以及以后的任何内容

像这样组合两个流:

        combinedObservable = truncatedRestData.ToObservable().Concat(socketObservable);
于 2018-04-25T08:12:18.750 回答