我一直致力于使用Reactive Extensions for Twitter's streaming APIsIObservable<T>
创建一个实现。
从较高级别发送 HTTP 请求并保持连接打开。发送以长度为前缀的项目以供消费。
Stream.ReadAsync
基本上,这是使用await
关键字的循环调用。IObserver<T>
接口实现(来自Dataflow library 或来自Dataflow libraryObservable.Create
的块,没关系,它是一个实现细节)被传递给这个循环,然后调用实现上的方法,产生 observable。IObserver<T>
但是,在此循环开始处理之前必须完成许多需要调用-returning 方法的事情,在 C# 5.0 中使用关键字Task<T>
更容易调用所有这些事情。await
像这样的东西:
public async Task<IObservable<string>> Create(string parameter,
CancellationToken cancellationToken)
{
// Make some call that requires await.
var response = await _httpClient.SendAsync(parameter, cancellationToken).
ConfigureAwait(false);
// Create a BufferBlock which will be the observable.
var block = new BufferBlock<T>();
// Start some background task which will use the block and publish to it
// on a separate task. This is not something that is awaited for.
ConsumeResponse(response, block, cancellationToken);
// Create the observable.
return block.AsObservable();
}
也就是说,我目前正在Task<IObservable<T>>
从我的方法中返回 a ,但我觉得我在 Reactive Extensions 中遗漏了一些东西,这些东西可以让我用来await
促进我需要进行的调用,但也返回 aIObservable<T>
而不是 a Task<IObservable<T>>
。
Reactive Extensions 中的什么方法允许我创建一个 observable,它需要在从创建方法返回之前等待方法?
我发现的最接近的是Observable.DeferAsync
. 假设对我的方法的调用和对 observable 的使用是这样的:
public async Task Observe()
{
// NOT the real name of the interface, but explains it's role here.
IObservableFactory factory;
// Create is really named something else.
IObservable<string> observable = factory.Create("parameter");
// Subscribe.
observable.Subscribe(o => Console.WriteLine("Observed: {0}", o));
// Wait.
await observable;
}
在这里使用DeferAsync
将不起作用,因为调用Subscribe
将发送第一个请求,然后读取该请求,然后调用await
onobservable
将创建第二个订阅,但会创建一个不同的observable。
或者,最终是Task<IObservable<T>>
在响应式框架中返回执行此操作的适当方法吗?
随后,由于该方法返回 a Task<T>
,因此最好传递 aCancellationToken
来取消操作。也就是说,我可以理解CancellationToken
用于取消可观察对象的创建,但它是否也可以用于取消实际的可观察对象(因为它可以向下传递以读取流等)。
我的直觉说不,因为这里违反了关注点分离以及取消的 DRY 原则:
- 创建的取消和可观察的取消是两个不同的事情。
- 调用
Subscribe
将返回一个IDisposable
取消订阅的实现。