7

我有一个想用 Rx 处理的用户交互场景。

该场景类似于规范的“当用户停止输入时,做一些工作”(通常,搜索用户到目前为止输入的内容)(1) - 但我还需要:

  • (2) 只获取“做一些工作”单位的最新结果(见下文)
  • (3)当一个新的工作单元开始时,取消任何正在进行的工作(在我的情况下它是 CPU 密集型的)

对于(1)我使用IObservable用户事件,限制.Throttle()为仅触发事件之间的暂停(“用户停止输入”)。

由此,我.Select(_ => CreateMyTask(...).ToObservable())

这给了我IObservable<IObservable<T>>每个内部可观察对象包装单个任务的位置。

为了得到(2)我最终申请.Switch()只从最新的工作单元中得到结果。

(3) - 取消挂起的任务怎么样?

如果我理解正确,只要有一个新的 inner IObservable<T>,该.Switch()方法就会订阅它并取消订阅以前的,从而导致它们Dispose().
也许可以以某种方式连接以触发任务取消?

4

2 回答 2

13

您可以使用Observable.FromAsyncwhich 将生成在观察者取消订阅时被取消的令牌:

input.Throttle(...)
     .Select(_ => Observable.FromAsync(token => CreateMyTask(..., token)))
     .Switch()
     .Subscribe(...);

这将为每个工作单元生成一个新令牌,并在每次Switch切换到新令牌时取消它。

于 2013-08-28T16:14:32.647 回答
3

您必须使用任务吗?

如果您乐于纯粹使用 Observables,那么您可以自己很好地完成这项工作。

尝试做这样的事情:

var query =
    Observable.Create<int>(o =>
    {
        var cancelling = false;
        var cancel = Disposable.Create(() =>
        {
            cancelling = true;
        });
        var subscription = Observable.Start(() =>
        {
            for (var i = 0; i < 100; i++)
            {
                Thread.Sleep(10); //1000 ms in total
                if (cancelling)
                {
                    Console.WriteLine("Cancelled on {0}", i);
                    return -1;
                }
            }
            Console.WriteLine("Done");
            return 42;
        }).Subscribe(o);
        return new CompositeDisposable(cancel, subscription);
    });

这个 observable 在 for 循环中使用 做一些艰苦的工作Thread.Sleep(10);,但是当 observable 被处理时,循环退出并且密集的 CPU 工作停止。然后你可以使用标准的 RxDisposeSwitch取消正在进行的工作。

如果您希望将其捆绑在一个方法中,请尝试以下操作:

public static IObservable<T> Start<T>(Func<Func<bool>, T> work)
{
    return Observable.Create<T>(o =>
    {
        var cancelling = false;
        var cancel = Disposable
            .Create(() => cancelling = true);
        var subscription = Observable
            .Start(() => work(() => cancelling))
            .Subscribe(o);
        return new CompositeDisposable(cancel, subscription);
    });
}

然后用这样的函数调用它:

Func<Func<bool>, int> work = cancelling =>
{
    for (var i = 0; i < 100; i++)
    {
        Thread.Sleep(10); //1000 ms in total
        if (cancelling())
        {
            Console.WriteLine("Cancelled on {0}", i);
            return -1;
        }
    }
    Console.WriteLine("Done");
    return 42;
};

这是我证明这有效的代码:

var disposable =
    ObservableEx
        .Start(work)
        .Subscribe(x => Console.WriteLine(x));

Thread.Sleep(500);
disposable.Dispose();

我的输出是“Cancelled on 50”(有时是“Cancelled on 51”)。

于 2013-08-28T02:33:33.847 回答