2

我是 Rx 的新手,所以请耐心等待。

我想将 a 包裹Task<T>在一个IObservable<T>. 到目前为止,一切都很好:

Task<T> task = Task.Factory.StartNew(...);
IObservable<T> obs = task.ToObservable();

现在,我想要的是在观察者取消订阅时发出取消任务的信号:

var cancel = new CancellationToken();
Task<T> task = Task.Factory.StartNew(..., cancel);

IObservable<T> obs = task.ToObservable(); //there should be a way to tie the cancel token
                                          //to the IObservable (?)

IDisposable disposable = obs.Subscribe(...);
Thread.Sleep(1000);
disposable.Dispose(); // this should signal the task to cancel

我怎么做?

FWIW 这里是产生这个切线的场景:Rx and tasks - 当新任务产生时取消正在运行的任务?

4

2 回答 2

2

假设你有这样的方法:

Task<Gizmo> GetGizmoAsync(int id, CancellationToken cancellationToken);

您可以使用以下方法将其转换为IObservable<Gizmo>订阅开始Task<Gizmo>和取消订阅取消它的位置。

IObservable<Gizmo> observable = Observable.FromAsync(
    cancellationToken => GetGizmoAsync(7, cancellationToken));

// Starts the task:
IDisposable subscription = observable.Subscribe(...);

// Cancels the task if it is still running:
subscription.Dispose();
于 2015-04-24T16:33:34.167 回答
2

这是我能想到的最简单的方法,使用Observable.Create

static IObservable<int> SomeRxWork()
{
    return Observable.Create<int>(o =>
    {
        CancellationTokenSource cts = new CancellationTokenSource();
        IDisposable sub = SomeAsyncWork(cts.Token).ToObservable().Subscribe(o);
        return new CompositeDisposable(sub, new CancellationDisposable(cts));
    });
}

static Task<int> SomeAsyncWork(CancellationToken token);

我在评论中暗示的最初方式实际上相当冗长:

static IObservable<int> SomeRxWork()
{
    return Observable.Create<int>(async (o, token) =>
    {
        try
        {
            o.OnNext(await SomeAsyncWork(token));
            o.OnCompleted();
        }
        catch (OperationCanceledException)
        {
        }
        catch (Exception ex)
        {
            o.OnError(ex);
        }
    });
}
于 2013-08-28T11:51:36.670 回答