6

我有以下设置

IObservable<Data> source = ...;

source
    .Select(data=>VeryExpensiveOperation(data))
    .Subscribe(data=>Console.WriteLine(data));

通常,这些事件会在合理的时间范围内分开。想象一下用户更新表单中的文本框。我们VeryExpensiveOperation 可能需要 5 秒钟才能完成,同时屏幕上会显示一个沙漏。

但是,如果在 5 秒内用户再次更新文本框,我想VeryExpensiveOperation 在新的开始之前向当前发送取消。

我会想象这样的场景

source
    .SelectWithCancel((data, cancelToken)=>VeryExpensiveOperation(data, token))
    .Subscribe(data=>Console.WriteLine(data));

因此,每次调用 lambda 时都会使用可用于管理取消的 cancelToken 来调用Task. 但是现在我们混合了 Task、CancelationToken 和 RX。不太确定如何将它们组合在一起。有什么建议么。

弄清楚如何使用 XUnit 测试操作员的奖励积分:)

第一次尝试

    public static IObservable<U> SelectWithCancelation<T, U>( this IObservable<T> This, Func<CancellationToken, T, Task<U>> fn )
    {
        CancellationTokenSource tokenSource = new CancellationTokenSource();

        return This
            .ObserveOn(Scheduler.Default)
            .Select(v=>{
                tokenSource.Cancel();
                tokenSource=new CancellationTokenSource();
                return new {tokenSource.Token, v};
            })
            .SelectMany(o=>Observable.FromAsync(()=>fn(o.Token, o.v)));
    }

尚未测试。我希望一个未完成的任务会生成一个 IObservable,它在不触发任何OnNext事件的情况下完成。

4

2 回答 2

13

您必须建模VeryExpensiveOperation为可取消的异步事物。一个Task或一个IObservable。我假设这是一个任务CancellationToken

Task<TResult> VeryExpensiveOperationAsync<TSource, TResult>(TSource item, CancellationToken token);

然后你这样做:

source
    .Select(item => Observable.DeferAsync(async token =>
    {
        // do not yield the observable until after the operation is completed
        // (ie do not just do VeryExpensiveOperation(...).ToObservable())
        // because DeferAsync() will dispose of the token source as soon
        // as you provide the observable (instead of when the observable completes)
        var result = await VeryExpensiveOperationAsync(item, token);
        return Observable.Return(result);
    })
    .Switch();

只是创建了一个延迟的Selectobservable,当它被订阅时,它将创建一个令牌并启动操作。如果在操作完成之前取消订阅 observable,则令牌将被取消。

订阅来自的Switch每个新的 observable Select,取消订阅它之前订阅的 observable。

这有你想要的效果。

PS这很容易测试。只需提供一个模拟源和一个VeryExpensiveOperation使用单元测试提供的模拟源,TaskCompletetionSource这样单元测试就可以准确控制何时生成新源项以及何时完成任务。像这样的东西:

void SomeTest()
{
    // create a test source where the values are how long
    // the mock operation should wait to do its work.
    var source = _testScheduler.CreateColdObservable<int>(...);

    // records the actions (whether they completed or canceled)
    List<bool> mockActionsCompleted = new List<bool>();
    var resultStream = source.SelectWithCancellation((token, delay) =>
    {
        var tcs = new TaskCompletionSource<string>();
        var tokenRegistration = new SingleAssignmentDisposable();

        // schedule an action to complete the task
        var d = _testScheduler.ScheduleRelative(delay, () =>
        {
           mockActionsCompleted.Add(true);
           tcs.SetResult("done " + delay);
           // stop listening to the token
           tokenRegistration.Dispose();
        });

        // listen to the token and cancel the task if the token signals
        tokenRegistration.Disposable = token.Register(() =>
        {
           mockActionsCompleted.Add(false);
           tcs.TrySetCancelled();
           // cancel the scheduled task
           d.Dispose();
        });

        return tcs.Task;
    });

    // subscribe to resultStream
    // start the scheduler
    // assert the mockActionsCompleted has the correct sequence
    // assert the results observed were what you expected.
}

testScheduler.Start()由于动态安排的新操作,您可能会在使用时遇到麻烦。一个while循环testScheduler.AdvanceBy(1)可能会更好。

于 2013-07-24T15:08:43.920 回答
-1

为什么不直接使用油门?

http://rxwiki.wikidot.com/101samples#toc30

Throttle 会停止事件流,直到在指定的时间段内不再产生事件。例如,如果您将文本框的 TextChanged 事件限制为 0.5 秒,则在用户停止输入 0.5 秒之前不会传递任何事件。这在您不想在每次击键后开始新搜索但想等到用户暂停的搜索框中很有用。

SearchTextChangedObservable = Observable.FromEventPattern<TextChangedEventArgs>(this.textBox, "TextChanged");
_currentSubscription = SearchTextChangedObservable.Throttle(TimeSpan.FromSeconds(.5)).ObserveOnDispatcher
于 2013-07-24T16:12:20.367 回答