24

如何清除 a 上的缓冲区ReplaySubject

我需要定期清除缓冲区(在我的情况下作为一天结束的事件)以防止ReplaySubject不断增长并最终吃掉所有内存。

理想情况下,我希望保持不变ReplaySubject,因为客户订阅仍然很好。

4

3 回答 3

22

ReplaySubject不提供清除缓冲区的方法,但是有几个重载可以以不同的方式约束其缓冲区:

  • TimeSpan项目保留的最大值
  • 最大项目数
  • 上述的组合,只要满足任一条件,就会丢弃物品。

可清除的重播主题

这是一个非常有趣的问题——我决定看看使用现有的主题和运算符(因为它们非常健壮)来实现ReplaySubject可以清除的变体是多么容易。事实证明这相当简单。

我已经通过内存分析器运行它以检查它是否正确。调用Clear()以刷新缓冲区,否则它就像常规的 unbounded 一样工作ReplaySubject

public class RollingReplaySubject<T> : ISubject<T>
{
    private readonly ReplaySubject<IObservable<T>> _subjects;
    private readonly IObservable<T> _concatenatedSubjects;
    private ISubject<T> _currentSubject;

    public RollingReplaySubject()
    {
        _subjects = new ReplaySubject<IObservable<T>>(1);
        _concatenatedSubjects = _subjects.Concat();
        _currentSubject = new ReplaySubject<T>();
        _subjects.OnNext(_currentSubject);
    }

    public void Clear()
    {
        _currentSubject.OnCompleted();
        _currentSubject = new ReplaySubject<T>();
        _subjects.OnNext(_currentSubject);
    }

    public void OnNext(T value)
    {
        _currentSubject.OnNext(value);
    }

    public void OnError(Exception error)
    {
        _currentSubject.OnError(error);
    }

    public void OnCompleted()
    {
        _currentSubject.OnCompleted();
        _subjects.OnCompleted();     
        // a quick way to make the current ReplaySubject unreachable
        // except to in-flight observers, and not hold up collection
        _currentSubject = new Subject<T>();       
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        return _concatenatedSubjects.Subscribe(observer);
    }
}

尊重通常的规则(与 any 一样Subject)并且不要同时调用此类上的方法 - 包括Clear(). 如果需要,您可以轻松添加同步锁。

它通过在主 ReplaySubject 中嵌套一系列 ReplaySubject 来工作。外部 ReplaySubject ( _subjects) 拥有一个恰好包含一个内部 ReplaySubject ( _currentSubject) 的缓冲区,并且在构造时填充。

这些OnXXX方法调用_currentSubjectReplaySubject。

观察者订阅了嵌套 ReplaySubjects 的串联投影(保存在 中_concatenatedSubjects)。因为缓冲区大小_subjects仅为 1,所以新订阅者仅获取最近的事件ReplaySubject

每当我们需要“清除缓冲区”时,现有的_currentSubjectisOnCompleted和一个新的 ReplaySubject 被添加到_subjects并成为新的_currentSubject.

增强功能

按照@Brandon 的建议,我创建了一个版本,RollingReplaySubject它使用一个TimeSpan或一个输入流来表示缓冲区清除。我在这里为此创建了一个要点:https ://gist.github.com/james-world/c46f09f32e2d4f338b07

于 2015-03-09T15:13:45.590 回答
0

很可能您已经有一个 Observable 数据源,在这种情况下,这里是另一种解决方案。这个使用现有 RX 构造的组合,而不是构建自己的 ISubject,我个人对此持谨慎态度。

public class ClearableReplaySubject<TSource, TClearTrigger> : IConnectableObservable<TSource>
{
    private readonly IConnectableObservable<IObservable<TSource>> _underlying;
    private readonly SerialDisposable _replayConnectDisposable = new SerialDisposable();

    public ClearableReplaySubject(IObservable<TSource> src, IObservable<TClearTrigger> clearTrigger)
    {
        _underlying = clearTrigger.Select(_ => Unit.Default).StartWith(Unit.Default)
            .Select(_ =>
            {
                var underlyingReplay = src.Replay();
                _replayConnectDisposable.Disposable = underlyingReplay.Connect();
                return underlyingReplay;
            })
            .Replay(1);
    }

    public IDisposable Subscribe(IObserver<TSource> observer)
    {
        return _underlying.Switch().Subscribe(observer);
    }

    public IDisposable Connect()
    {
        return new CompositeDisposable(_underlying.Connect(), _replayConnectDisposable.Disposable);
    }
}

如果将以下扩展方法添加到 ObservableEx:

public static class ObservableEx
{
    public static IConnectableObservable<TItem> ReplayWithReset<TItem, TReset>(this IObservable<TItem> src, IObservable<TReset> resetTrigger)
    {
        return new ClearableReplaySubject<TItem, TReset>(src, resetTrigger);
    }
}

然后您可以使用您的源并添加.ReplayWithReset(...)与您的重置触发器 Observable。这可能是一个计时器或其他任何东西。

var replay = sourceObservable.ReplayWithReset(triggerObservable);
var connection = replay.Connect();

连接的行为方式与重播相同。

于 2015-03-24T13:31:34.393 回答
0

好吧,我不了解 c#,但我设法在重播主题 rxdart 中完成了它。至于 replaysubject,它使用队列来缓存事件,所以我修改了 replaysubject 类。

  1. 我将所有队列更改为 List
  2. 添加了 onRemove 方法,该方法将从 chached 列表中删除事件。

原重播主题:

class ReplaySubject<T> extends Subject<T> implements ReplayStream<T> {
final Queue<T> _queue;
final int _maxSize;

/// Constructs a [ReplaySubject], optionally pass handlers for
/// [onListen], [onCancel] and a flag to handle events [sync].
///
/// See also [StreamController.broadcast]
factory ReplaySubject({
    int maxSize,
    void Function() onListen,
    void Function() onCancel,
    bool sync = false,
}) {
    // ignore: close_sinks
    final controller = StreamController<T>.broadcast(
        onListen: onListen,
        onCancel: onCancel,
        sync: sync,
    );

    final queue = Queue<T>();

    return ReplaySubject<T>._(
        controller,
        Rx.defer<T>(
                () => controller.stream.startWithMany(queue.toList(growable: false)),
            reusable: true,
        ),
        queue,
        maxSize,
    );
}

ReplaySubject._(
    StreamController<T> controller,
    Stream<T> stream,
    this._queue,
    this._maxSize,
    ) : super(controller, stream);

@override
void onAdd(T event) {
    if (_queue.length == _maxSize) {
        _queue.removeFirst();
    }

    _queue.add(event);
}

@override
List<T> get values => _queue.toList(growable: false);
}

修改重播主题:

class ModifiedReplaySubject<T> extends Subject<T> implements ReplayStream<T> {
final List<T> _list;
final int _maxSize;

/// Constructs a [ModifiedReplaySubject], optionally pass handlers for
/// [onListen], [onCancel] and a flag to handle events [sync].
///
/// See also [StreamController.broadcast]
factory ModifiedReplaySubject({
    int maxSize,
    void Function() onListen,
    void Function() onCancel,
    bool sync = false,
}) {
    // ignore: close_sinks
    final controller = StreamController<T>.broadcast(
        onListen: onListen,
        onCancel: onCancel,
        sync: sync,
    );

    final queue = List<T>();

    return ModifiedReplaySubject<T>._(
        controller,
        Rx.defer<T>(
                () => controller.stream.startWithMany(queue.toList(growable: false)),
            reusable: true,
        ),
        queue,
        maxSize,
    );
}

ModifiedReplaySubject._(
    StreamController<T> controller,
    Stream<T> stream,
    this._list,
    this._maxSize,
    ) : super(controller, stream);

@override
void onAdd(T event) {
    if (_list.length == _maxSize) {
        _list.removeAt(0);
    }

    _list.add(event);
}

void onRemove(T event) {
    _list.remove(event);
}

@override
List<T> get values => _list.toList(growable: false);
}
于 2020-01-15T13:50:32.587 回答