我正在尝试创建一个 Observable,其中每个项目都是通过异步任务生成的。下一项应通过对上一项的结果(共同递归)的异步调用来生成。在“生成”的说法中,这看起来像这样 - 除了生成不支持异步(也不支持初始状态的委托)。
var ob = Observable.Generate(
async () => await ProduceFirst(), // Task<T> ProduceFirst()
prev => Continue(prev) // bool Continue(T);
async prev => await ProduceNext(prev) // Task<T> ProduceNext(T)
item => item
);
作为更具体的示例,要通过一次获取 100 条消息来查看 ServiceBus 队列中的所有消息,请实现 ProduceFirst、Continue 和 ProduceNext,如下所示:
Task<IEnumerable<BrokeredMessage>> ProduceFirst()
{
const int batchSize = 100;
return _serviceBusReceiver.PeekBatchAsync(batchSize);
}
bool Continue(IEnumerable<BrokeredMessage> prev)
{
return prev.Any();
}
async Task<IEnumerable<BrokeredMessage>> ProduceNext(IEnumerable<BrokeredMessage> prev)
{
const int batchSize = 100;
return (await _serviceBusReceiver.PeekBatchAsync(prev.Last().SequenceNumber, batchSize + 1)).Skip(1)
}
然后调用.SelectMany(i => i)
把IObservable<IEnumerable<BrokeredMessage>>
它变成一个IObservable<BrokeredMessage>
其中 _serviceBusReceiver 是一个接口的实例,如下所示:
public interface IServiceBusReceiver
{
Task<IEnumerable<BrokeredMessage>> PeekBatchAsync(int batchSize);
Task<IEnumerable<BrokeredMessage>> PeekBatchAsync(long fromSequenceNumber, int batchSize);
}
BrokeredMessage 来自https://msdn.microsoft.com/en-us/library/microsoft.servicebus.messaging.brokeredmessage.aspx