我正在使用 rx distinct 运算符在长时间运行的过程中根据某个键过滤外部数据流。
这会导致内存泄漏吗?假设将收到很多不同的密钥。rx distinct 运算符如何跟踪先前收到的密钥?
我应该将 groupbyuntil 与持续时间选择器一起使用吗?
我正在使用 rx distinct 运算符在长时间运行的过程中根据某个键过滤外部数据流。
这会导致内存泄漏吗?假设将收到很多不同的密钥。rx distinct 运算符如何跟踪先前收到的密钥?
我应该将 groupbyuntil 与持续时间选择器一起使用吗?
Observable.Distinct
在内部使用 a HashSet
。内存使用将大致与遇到的不同键的数量成正比。(AFAIK 大约 30*n 个字节)
GroupByUntil
做一些与 . 真正不同的事情Distinct
。
GroupByUntil
(好)组,而Distinct
过滤流的元素。
不确定预期用途,但如果您只想过滤您需要的连续相同元素Observable.DistinctUntilChanged
,其内存占用量与键数无关。
这可能是一个有争议的策略,但如果你担心不同的密钥会累积,并且如果有一个时间点可以安全地重置,你可以使用 Observable.Switch 引入重置策略。例如,我们有一个场景,“世界状态”每天都会重置,所以我们可以每天重置不同的 observable。
Observable.Create<MyPoco>(
observer =>
{
var distinctPocos = new BehaviorSubject<IObservable<MyPoco>>(pocos.Distinct(x => x.Id));
var timerSubscription =
Observable.Timer(
new DateTimeOffset(DateTime.UtcNow.Date.AddDays(1)),
TimeSpan.FromDays(1),
schedulerService.Default).Subscribe(
t =>
{
Log.Info("Daily reset - resetting distinct subscription.");
distinctPocos.OnNext(pocos.Distinct(x => x.Id));
});
var pocoSubscription = distinctPocos.Switch().Subscribe(observer);
return new CompositeDisposable(timerSubscription, pocoSubscription);
});
但是,我确实倾向于同意 James World 上面关于使用内存分析器进行测试以在引入潜在不必要的复杂性之前检查内存确实是一个问题的评论。如果您累积 32 位整数作为键,那么在大多数平台上遇到内存问题之前,您将拥有数百万个唯一项。例如 262144 个 32 位 int 键将占用 1 兆字节。根据您的情况,您可能在此之前很久就重置了该过程。