2

我想要做的声明如下所示:

// Checks input source for timeouts, based on the number of elements received 
// from clock since the last one received from source. 
// The two selectors are used to generate output elements.
public static IObservable<R> TimeoutDetector<T1,T2,R>(
        this IObservable<T1> source, 
        IObservable<T2> clock, 
        int countForTimeout,
        Func<R> timedOutSelector, 
        Func<T1, R> okSelector)

大理石图在 ascii 中很困难,但这里有:

source --o---o--o-o----o-------------------o---
clock  ----x---x---x---x---x---x---x---x---x---
output --^---^--^-^----^-----------!-------^---

我已经尝试寻找Observable可以组合sourceclock以我可以使用的方式的现有函数,但是大多数组合函数依赖于接收“每个”(AndZip),或者它们从'缺少'一个 ( CombineLatest),或者它们离我需要的太远了 ( Amb, GroupJoin, Join, Merge, SelectMany, Timeout)。Sample看起来很接近,但我不想将源吞吐量限制为时钟速率。

所以现在我被困在试图填补这里的巨大空白:

return new AnonymousObservable<R>(observer =>
{
    //One observer, two observables??
});

抱歉,“您尝试过什么”部分在这里有点弱:假设我已经尝试过思考它!我不是要求完整的实施,只是:

  • 是否有一个内置功能可以帮助我,我错过了?
  • 如何构建一个订阅两个可观察对象的基于 lambda 的观察者?
4

4 回答 4

6

我知道您并没有要求完全实施,但我认为这是一个解决方案:

public static IObservable<TR> TimeoutDetector<T1, T2, TR>(
    this IObservable<T1> source,
    IObservable<T2> clock,
    int countForTimeout,
    Func<TR> timedOutSelector,
    Func<T1, TR> okSelector)
{
    return source
        .Select(i => clock.Take(countForTimeout).LastAsync())
        .Switch().Select(_ => timedOutSelector())
        .Merge(source.Select(okSelector));
}

它的工作原理如下 - 我注意到您的输出是 okSelector 投影的源,与超时事件合并。所以我专注于产生超时事件,因为其余的很容易。

这个想法是在每次源发射时创建一个倒计时,并在每个时钟脉冲上递减这个倒计时。如果源发出,我们会中止倒计时,否则当倒计时达到 0 时,我们会产生一个 timedOut 事件。

分解它:

  1. 将每个源元素投影到一个接收第一个countForTimeout元素的流中——注意时钟流必须是一个“热”的 observable,因为我们在每个 countDown 事件上都订阅它。时钟流变热是很正常的。如果这发生了事件,我们就会超时。
  2. Switch将丢弃除最新倒计时流之外的所有内容。
  3. 用于Select投影到 timedOut 事件。
  4. 现在只需合并源事件。

这是我使用的单元测试,旨在与您的大理石图非常相似(nuget rx-testing & nunit 用于编译必要的库):

    [Test]
    public void AKindOfTimeoutTest()
    {
        var scheduler = new TestScheduler();

        var clockStream = scheduler.CreateHotObservable(
            OnNext(100, Unit.Default),
            OnNext(200, Unit.Default),
            OnNext(300, Unit.Default),
            OnNext(400, Unit.Default),
            OnNext(500, Unit.Default),
            OnNext(600, Unit.Default),
            OnNext(750, Unit.Default), /* make clock funky! */
            OnNext(800, Unit.Default),
            OnNext(900, Unit.Default));


        var sourceStream = scheduler.CreateColdObservable(
            OnNext(50, 1),
            OnNext(150, 2),
            OnNext(250, 3),
            OnNext(275, 4),
            OnNext(400, 5),
            OnNext(900, 6));


        Func<int> timedOutSelector = () => 0;
        Func<int, int> okSelector = i => i;

        var results = scheduler.CreateObserver<int>();

        sourceStream.TimeoutDetector(clockStream, 3, timedOutSelector, okSelector)
                    .Subscribe(results);

        scheduler.Start();

        results.Messages.AssertEqual(
            OnNext(50, 1),
            OnNext(150, 2),
            OnNext(250, 3),
            OnNext(275, 4),
            OnNext(400, 5),
            OnNext(750, 0),
            OnNext(900, 6));
    }
}

尝试回答您的具体问题:

  • 问:有没有可以帮助我的内置功能,但我错过了?A. 可能扫描是关键。
  • 问:如何构建一个订阅两个 observable 的基于 lambda 的观察者?A. 不太清楚你的意思......有很多组合流的方法,你提到了其中的大部分。
于 2013-11-07T08:35:44.703 回答
2

这是我提到的 Observable.Create 方法(相同的测试工作):

public static IObservable<TR> TimeoutDetector<T1, T2, TR>(
    this IObservable<T1> source,
    IObservable<T2> clock,
    int countForTimeout,
    Func<TR> timedOutSelector,
    Func<T1, TR> okSelector)
{
    return Observable.Create<TR>(observer =>
        {
            var counter = countForTimeout;

            var timeoutSub = clock.Subscribe(_ =>
                {
                    var count = Interlocked.Decrement(ref counter);
                    if (count == 0)
                    {
                        observer.OnNext(timedOutSelector());
                    }
                },
                observer.OnError,
                observer.OnCompleted);

            var sourceSub = source.Subscribe(
                i =>
                {
                    Interlocked.Exchange(ref counter, countForTimeout);
                    observer.OnNext(okSelector(i));
                },
                observer.OnError,
                observer.OnCompleted);

            return new CompositeDisposable(sourceSub, timeoutSub);
        });
}

请注意,Observable.Create 对于确保使用正确的 Rx 语法非常有帮助(即流发出 OnNext* (OnError | OnCompleted)? - 这意味着我最多可以稍微放松一下发送 OnError 或 OnCompleted 一次。

于 2013-11-07T09:56:06.520 回答
0

我想出了这个,这比詹姆斯的回答要漂亮得多。

public static IObservable<R> TimeoutDetector2<T1, T2, R>(
        this IObservable<T1> source, 
        IObservable<T2> clock, int maxDiff,
        Func<R> timedOutSelector, Func<T1, R> okSelector)
{
    return new AnonymousObservable<R>(observer =>
    {
        int counter = 0;
        object gate = new object();
        bool error = false;
        bool completed = false;
        bool timedOut = false;
        var sourceSubscription = source.Subscribe(
            x =>
            {
                lock(gate)
                {
                    if(!error && !completed) observer.OnNext(okSelector(x));
                    counter = 0;
                    timedOut = false;
                }
            },
            ex =>
            {
                lock(gate)
                {
                    error = true;
                    if(!completed) observer.OnError(ex);
                }
            },
            () =>
            {
                lock(gate)
                {
                    completed = true;
                    if(!error) observer.OnCompleted();
                }
            });
        var clockSubscription = clock.Subscribe(
            x =>
            {
                lock(gate)
                {
                    counter = counter + 1;
                    if(!error && !completed && counter > maxDiff && !timedOut)
                    {
                        timedOut = true;
                        observer.OnNext(timedOutSelector());
                    }
                }
            },
            ex =>
            {
                lock(gate)
                {
                    error = true;
                    if(!completed) observer.OnError(ex);
                }
            },
            () =>
            {
                lock(gate)
                {
                    completed = true;
                    if(!error) observer.OnCompleted();
                }
            });

        //need to return a subscription
        return new CompositeDisposable(sourceSubscription, clockSubscription);
    }).Publish().RefCount(); // prevent subscribers provoking more than one subscription to source and clock
}
于 2013-11-07T09:30:22.267 回答
0

当然,这是一个老问题。我一直在寻找更高级的东西,timeoutWith以便我可以取消超时逻辑。

我想知道超时是否几乎等同于:

race(throwError('timedout').pipe(delay(10000)), yourObs$)

那么这里显示的“throwError”当然是可以取消的。

如果你想知道为什么——我有一些由可观察链控制的“步骤”,我有一个超时。但是,如果其中一个步骤包括打开一个对话框,那么我希望取消超时!

于 2019-01-25T20:06:25.000 回答