13

我遇到的一个用例,我怀疑我不能是唯一的一个,是这样的方法:

IObservable<T> Observable.RepeatLastValueDuringSilence(this IObservable<T> inner, TimeSpan maxQuietPeriod);

这将从内部 observable 返回所有未来的项目,而且,如果内部 observable 在一段时间内(maxQuietPeriod)没有调用 OnNext,它只会重复最后一个值(当然直到内部调用 OnCompleted 或 OnError) .

理由是服务定期 ping 出定期状态更新。例如:

var myStatus = Observable.FromEvent(
    h=>this.StatusUpdate+=h,
    h=>this.StatusUpdate-=h);

var messageBusStatusPinger = myStatus
    .RepeatLastValueDuringSilence(TimeSpan.FromSeconds(1))
    .Subscribe(update => _messageBus.Send(update));

这样的事情存在吗?还是我高估了它的用处?

谢谢,亚历克斯

PS:对于任何不正确的术语/语法,我深表歉意,因为我只是第一次探索 Rx。

4

4 回答 4

10

与 Matthew 的解决方案类似,但这里的计时器在源中收到每个元素后开始,我认为这更正确(但差异不太可能重要):

public static IObservable<T> RepeatLastValueDuringSilence<T>(this IObservable<T> inner, TimeSpan maxQuietPeriod)
{    
    return inner.Select(x => 
        Observable.Interval(maxQuietPeriod)
                  .Select(_ => x)
                  .StartWith(x)
    ).Switch();
}

和测试:

var source = Observable.Interval(TimeSpan.FromMilliseconds(100)).Take(5).Select(_ => "1")
                       .Concat(Observable.Interval(TimeSpan.FromSeconds(1)).Take(5).Select(_ => "2"))
                       .Concat(Observable.Interval(TimeSpan.FromMilliseconds(100)).Take(5).Select(_ => "3"));

source.RepeatLastValueDuringSilence(TimeSpan.FromMilliseconds(200)).Subscribe(Console.WriteLine);

您应该看到1打印了 10 次(来自源的 5 次,在静默期间重复 5 次),然后2当您从源中获得一个时,从每个之间的静默中获得更多的 4 次,然后是无限的3

于 2012-07-13T05:18:13.353 回答
9

这个相当简单的查询可以完成这项工作:

var query =
    source
        .Select(s =>
            Observable
                .Interval(TimeSpan.FromSeconds(1.0))
                .StartWith(s)
                .Select(x => s))
        .Switch();

永远不要低估.Switch().

于 2012-07-13T09:41:49.113 回答
1

我认为这可以满足您的要求,如果您的 observable 不热,则需Publish要这样做Refcount

public static IObservable<T> RepeatLastValueDuringSilence<T>(this IObservable<T> inner, TimeSpan maxQuietPeriod)
{
    var throttled = inner.Throttle(maxQuietPeriod);
    var repeating = throttled.SelectMany(i => 
        Observable
            .Interval(maxQuietPeriod)
            .Select(_ => i)
            .TakeUntil(inner));
    return Observable.Merge(inner, throttled, repeating);
}
于 2012-07-13T00:55:48.513 回答
-1

Rx 库中没有方法,但我也需要这样的方法。在我的用例中,即使源没有输出任何值,我也需要输出值。如果您不想在第一个源值出现之前输出任何值,您可以在 subscribe 调用之前删除defaultValue参数和调用。createTimer()

需要调度程序来运行计时器。一个明显的重载是不使用调度程序并选择默认调度程序(我使用了 ThreadPool 调度程序)。

Imports System.Reactive
Imports System.Reactive.Concurrency
Imports System.Reactive.Disposables
Imports System.Reactive.Linq

<Extension()>
Public Function AtLeastEvery(Of T)(source As IObservable(Of T), 
                                   timeout As TimeSpan, 
                                   defaultValue As T, 
                                   scheduler As IScheduler
                                  ) As IObservable(Of T)
    If source Is Nothing Then Throw New ArgumentNullException("source")
    If scheduler Is Nothing Then Throw New ArgumentNullException("scheduler")
    Return Observable.Create(
        Function(observer As IObserver(Of T))
            Dim id As ULong
            Dim gate As New Object()
            Dim timer As New SerialDisposable()
            Dim lastValue As T = defaultValue

            Dim createTimer As Action =
                Sub()
                    Dim startId As ULong = id
                    timer.Disposable = scheduler.Schedule(timeout,
                                           Sub(self As Action(Of TimeSpan))
                                               Dim noChange As Boolean
                                               SyncLock gate
                                                   noChange = (id = startId)
                                                   If noChange Then
                                                       observer.OnNext(lastValue)
                                                   End If
                                               End SyncLock
                                               'only restart if no change, otherwise
                                               'the change restarted the timeout
                                               If noChange Then self(timeout)
                                           End Sub)
                End Sub
            'start the first timeout
            createTimer()
            'subscribe to the source observable
            Dim subscription = source.Subscribe(
                Sub(v)
                    SyncLock gate
                        id += 1UL
                        lastValue = v
                    End SyncLock
                    observer.OnNext(v)
                    createTimer() 'reset the timeout
                End Sub,
                Sub(ex)
                    SyncLock gate
                        id += 1UL
                    End SyncLock
                    observer.OnError(ex)
                    'do not reset the timeout, because the sequence has ended
                End Sub,
                Sub()
                    SyncLock gate
                        id += 1UL
                    End SyncLock
                    observer.OnCompleted()
                    'do not reset the timeout, because the sequence has ended
                End Sub)

            Return New CompositeDisposable(timer, subscription)
        End Function)
End Function
于 2012-07-12T17:32:58.450 回答