我有一个大的 ReplaySubject,它允许订阅者正常接收重播缓冲区和未来的通知。此外,我希望能够获取当前缓冲区的“快照”并将其作为列表同步返回,而无需订阅。
有没有办法做到这一点?
谢谢
我有一个大的 ReplaySubject,它允许订阅者正常接收重播缓冲区和未来的通知。此外,我希望能够获取当前缓冲区的“快照”并将其作为列表同步返回,而无需订阅。
有没有办法做到这一点?
谢谢
您可以不只是订阅,接收项目,然后取消订阅吗?
public static List<T> Snapshot<T>(ReplaySubject<T> subject)
{
List<T> snapshot = new List<T>();
using (subject.Subscribe(item => snapshot.Add(item))
{
// Deliberately empty; subscribing will add everything to the list.
}
return snapshot;
}
当然,这是假设订阅 aReplaySubject<T>
会同步调用元素处理程序。你想检查一下,但这就是我所期望的。
您还应该考虑是否要以某种方式处理错误/完成。
只是因为@PaulBetts说只有一种方法可以做到:)
注意:我不推荐这种方式;使用 Skeet 的方法,你以后会感谢自己的。
所以 all 的神奇之处在于它将通过internalReplaySubject<T>
接收到的任何值排队。因此,我们可以编写一个包装器,在重播主题的私有细节中摆弄以获取该信息:OnNext
Queue<TimeInterval<T>>
public class FixedReplaySubject<T> : ISubject<T>
{
private ReplaySubject<T> _inner;
private Func<Queue<TimeInterval<T>>> _snapshotGetter;
public FixedReplaySubject(ReplaySubject<T> source)
{
_inner = source;
var expr = Expression.Lambda(
typeof(Func<Queue<TimeInterval<T>>>),
Expression.Field(
Expression.Constant(source),
source.GetType()
.GetField("_queue", BindingFlags.NonPublic|BindingFlags.Instance)));
_snapshotGetter = (Func<Queue<TimeInterval<T>>>)expr.Compile();
}
public IEnumerable<TimeInterval<T>> Snapshot()
{
return _snapshotGetter();
}
public IDisposable Subscribe(IObserver<T> observer)
{
return _inner.Subscribe(observer);
}
public void OnNext(T value)
{
_inner.OnNext(value);
}
public void OnCompleted()
{
_inner.OnCompleted();
}
public void OnError(Exception error)
{
_inner.OnError(error);
}
public void Dispose()
{
_inner.Dispose();
}
}
试验台:
void Main()
{
var src = new ReplaySubject<int>();
src.OnNext(1);
src.OnNext(2);
src.OnNext(3);
src.OnNext(4);
src.OnNext(5);
src.OnNext(6);
var heh = new FixedReplaySubject<int>(src);
heh.Snapshot().Dump();
}
结果:(TimeInterval<T>
只是价值+它进来的时间)
1 00:00:00.0010265
2 00:00:00.0010278
3 00:00:00.0010278
4 00:00:00.0010282
5 00:00:00.0010282
6 00:00:00.0010286