6

我一直致力于使用Reactive Extensions for Twitter's streaming APIsIObservable<T>创建一个实现。

从较高级别发送 HTTP 请求并保持连接打开。发送以长度为前缀的项目以供消费。

Stream.ReadAsync基本上,这是使用await关键字的循环调用。IObserver<T>接口实现(来自Dataflow library 或来自Dataflow libraryObservable.Create的块,没关系,它是一个实现细节)被传递给这个循环,然后调用实现上的方法,产生 observable。IObserver<T>

但是,在此循环开始处理之前必须完成许多需要调用-returning 方法的事情,在 C# 5.0 中使用关键字Task<T>更容易调用所有这些事情。await像这样的东西:

public async Task<IObservable<string>> Create(string parameter,
    CancellationToken cancellationToken)
{
     // Make some call that requires await.
     var response = await _httpClient.SendAsync(parameter, cancellationToken).
         ConfigureAwait(false);

     // Create a BufferBlock which will be the observable.
     var block = new BufferBlock<T>();

     // Start some background task which will use the block and publish to it
     // on a separate task.  This is not something that is awaited for.
     ConsumeResponse(response, block, cancellationToken);

     // Create the observable.
     return block.AsObservable();
}

也就是说,我目前正在Task<IObservable<T>>从我的方法中返回 a ,但我觉得我在 Reactive Extensions 中遗漏了一些东西,这些东西可以让我用来await促进我需要进行的调用,但也返回 aIObservable<T>而不是 a Task<IObservable<T>>

Reactive Extensions 中的什么方法允许我创建一个 observable,它需要在从创建方法返回之前等待方法?

我发现的最接近的是Observable.DeferAsync. 假设对我的方法的调用和对 observable 的使用是这样的:

public async Task Observe()
{
    // NOT the real name of the interface, but explains it's role here.
    IObservableFactory factory;

    // Create is really named something else.
    IObservable<string> observable = factory.Create("parameter");

    // Subscribe.
    observable.Subscribe(o => Console.WriteLine("Observed: {0}", o));

    // Wait.
    await observable;
}

在这里使用DeferAsync将不起作用,因为调用Subscribe将发送第一个请求,然后读取该请求,然后调用awaitonobservable将创建第二个订阅,但会创建一个不同的observable。

或者,最终是Task<IObservable<T>>在响应式框架中返回执行此操作的适当方法吗?

随后,由于该方法返回 a Task<T>,因此最好传递 aCancellationToken来取消操作。也就是说,我可以理解CancellationToken用于取消可观察对象的创建,但它是否也可以用于取消实际的可观察对象(因为它可以向下传递以读取流等)。

我的直觉说不,因为这里违反了关注点分离以及取消的 DRY 原则:

  • 创建的取消和可观察的取消是两个不同的事情。
  • 调用Subscribe将返回一个IDisposable取消订阅的实现。
4

2 回答 2

8

我不会返回一个Task<IObservable<T>>. 在你的公共 API 中混合任务和 Observable 最终会让人感到困惑。请记住,可以将任务视为产生单个值的可观察对象。这也意味着不要在公共 API 中将 CancellationTokens 与 observables 混合。您可以通过订阅和取消订阅来控制 observables。

这并不意味着你不能在幕后混合概念。以下是如何做你想做的事Observable.UsingTask.ToObservableCancellationDisposable

首先,修改您的方法以返回Task<ISourceBlock<string>>

public async Task<ISourceBlock<string>> CreateBlock(string parameter, CancellationToken cancellationToken)
{
     // Make some call that requires await.
     var response = await _httpClient.SendAsync(parameter, cancellationToken).ConfigureAwait(false);

     // Create a BufferBlock which will be the observable.
     var block = new BufferBlock<T>();

     // Start some background task which will use the block and publish to it
     // on a separate task.  This is not something that is awaited for.
     ConsumeResponse(response, block, cancellationToken);

     return block;
}

现在这是使用上述方法的新 Create 方法:

public IObservable<string> Create(string parameter)
{
    // Create a cancellation token that will be canceled when the observable is unsubscribed, use this token in your call to CreateBlock.
    // Use ToObservable() to convert the Task to an observable so we can then
    // use SelectMany to subscribe to the block itself once it is available
    return Observable.Using(() => new CancellationDisposable(),
           cd => CreateBlock(parameter, cd.Token)
               .ToObservable()
               .SelectMany(block => block.AsObservable()));
}

编辑:我发现 Rx 已经实现了这个模式FromAsync

public IObservable<string> Create(string parameter)
{
    return Observable.FromAsync(token => CreateBlock(parameter, token))
                     .SelectMany(block => block.AsObservable());
}

而且DeferAsync,这更合适,因为您Task实际上是在创建您真正想要观察的 Observable(例如您的块):

public IObservable<string> Create(string parameter)
{
    return Observable.DeferAsync(async token => (await CreateBlock(parameter, token)).AsObservable());
}
于 2013-04-01T03:04:05.923 回答
2

我不明白为什么你需要BufferBlock在那之后创建唯一的await。相反,您可以做的是有一个同步方法来创建BufferBlock,开始异步初始化,然后立即返回。就像是:

public IObservable<string> Create(
    string parameter, CancellationToken cancellationToken)
{
     // Create a BufferBlock which will be the observable.
     var block = new BufferBlock<string>();

     // Start asynchronous initialization, but don't wait for the result
     InitializeAsync(parameter, block, cancellationToken);

     // Create the observable.
     return block.AsObservable();
}

private async Task InitializeAsync(
    string parameter, ITargetBlock<string> block,
    CancellationToken cancellationToken)
{
     // Make some call that requires await.
     var response = await _httpClient.SendAsync(parameter, cancellationToken).
         ConfigureAwait(false);

     // Start some background task which will use the block and publish to it
     // on a separate task.  This is not something that is awaited for.
     ConsumeResponse(response, block, cancellationToken);
}

(您可能还想InitializeAsync()通过调用Fault()传入的块来处理错误。)

这样,该Create()方法只返回IObservable<T>,但它也异步执行初始化。

或者,最终是Task<IObservable<T>>在响应式框架中返回执行此操作的适当方法吗?

我不这么认为。我认为这里不需要两个级别的异步。

创建的取消和可观察的取消是两个不同的事情。

这将取决于您的确切要求,但总的来说,我认为它们不是独立的东西。您想取消操作,它是否已经开始并不重要。

这类似于CancellationToken传递到Task.Run()的行为方式:它既用于Task在开始执行之前取消,也用于检测是否已启动的正确取消。

调用Subscribe将返回一个IDisposable取消订阅的实现。

是的,但仅此而已。由于您在这里描述的是您想要一个热可观察对象(无论观察者如何都会产生项目),因此它实际上不会取消可观察对象,只会取消订阅。

于 2013-03-31T17:44:08.343 回答