6

我正在尝试创建一个 Observable,其中每个项目都是通过异步任务生成的。下一项应通过对上一项的结果(共同递归)的异步调用来生成。在“生成”的说法中,这看起来像这样 - 除了生成不支持异步(也不支持初始状态的委托)。

var ob = Observable.Generate(
   async () => await ProduceFirst(),        // Task<T> ProduceFirst()
   prev => Continue(prev)                   // bool Continue(T);
   async prev => await ProduceNext(prev)    // Task<T> ProduceNext(T)
   item => item
);

作为更具体的示例,要通过一次获取 100 条消息来查看 ServiceBus 队列中的所有消息,请实现 ProduceFirst、Continue 和 ProduceNext,如下所示:

Task<IEnumerable<BrokeredMessage>> ProduceFirst() 
{
    const int batchSize = 100;
    return _serviceBusReceiver.PeekBatchAsync(batchSize);
}

bool Continue(IEnumerable<BrokeredMessage> prev)
{
    return prev.Any();
}

async Task<IEnumerable<BrokeredMessage>> ProduceNext(IEnumerable<BrokeredMessage> prev) 
{
    const int batchSize = 100;
    return (await _serviceBusReceiver.PeekBatchAsync(prev.Last().SequenceNumber, batchSize + 1)).Skip(1)
}

然后调用.SelectMany(i => i)IObservable<IEnumerable<BrokeredMessage>>它变成一个IObservable<BrokeredMessage>

其中 _serviceBusReceiver 是一个接口的实例,如下所示:

public interface IServiceBusReceiver
{
    Task<IEnumerable<BrokeredMessage>> PeekBatchAsync(int batchSize);
    Task<IEnumerable<BrokeredMessage>> PeekBatchAsync(long fromSequenceNumber, int batchSize);
}

BrokeredMessage 来自https://msdn.microsoft.com/en-us/library/microsoft.servicebus.messaging.brokeredmessage.aspx

4

5 回答 5

12

如果您要推出自己的异步Generate函数,我建议使用递归调度而不是包装一个 while 循环。

public static IObservable<TResult> Generate<TResult>(
    Func<Task<TResult>> initialState,
    Func<TResult, bool> condition,
    Func<TResult, Task<TResult>> iterate,
    Func<TResult, TResult> resultSelector,
    IScheduler scheduler = null) 
{
  var s = scheduler ?? Scheduler.Default;

  return Observable.Create<TResult>(async obs => {
    return s.Schedule(await initialState(), async (state, self) => 
    {
      if (!condition(state))
      {
        obs.OnCompleted();
        return;
      }

      obs.OnNext(resultSelector(state));

      self(await iterate(state));

    });
  });
}

这有几个优点。首先,您可以取消它,使用简单的 while 循环无法直接取消它,实际上您甚至在 observable 完成之前都不会返回订阅函数。其次,这使您可以控制每个项目的调度/异步性(这使测试变得轻而易举),这也使其更适合图书馆

于 2015-07-18T00:59:28.203 回答
5

经过大量测试后,我认为使用内置的 Rx 运算符可以很好地完成这项工作。

public static IObservable<TResult> Generate<TResult>(
    Func<Task<TResult>> initialState,
    Func<TResult, bool> condition,
    Func<TResult, Task<TResult>> iterate,
    Func<TResult, TResult> resultSelector,
    IScheduler scheduler = null) 
{
    return Observable.Create<TResult>(o =>
    {
        var current = default(TResult);
        return
            Observable
                .FromAsync(initialState)
                .Select(y => resultSelector(y))
                .Do(c => current = c)
                .Select(x =>
                    Observable
                        .While(
                            () => condition(current),
                            Observable
                                .FromAsync(() => iterate(current))
                                .Select(y => resultSelector(y))
                        .Do(c => current = c))
                        .StartWith(x))
                .Switch()
                .Where(x => condition(x))
                .ObserveOn(scheduler ?? Scheduler.Default)
                .Subscribe(o);
    });
}

我已经使用以下代码测试了此代码:

bool Continue(IEnumerable<BrokeredMessage> prev)
{
    return prev.Any();
}

Task<IEnumerable<BrokeredMessage>> ProduceFirst()
{
    return
        Task.FromResult(
            EnumerableEx.Return(
                new BrokeredMessage()
                {
                    SequenceNumber = 1
                }));
}

Task<IEnumerable<BrokeredMessage>> ProduceNext(IEnumerable<BrokeredMessage> prev) 
{
    return Task.FromResult(
        prev.Last().SequenceNumber < 3
            ? EnumerableEx.Return(
                new BrokeredMessage()
                {
                    SequenceNumber = prev.Last().SequenceNumber + 1 
                })
            : Enumerable.Empty<BrokeredMessage>());
}

public class BrokeredMessage
{
    public int SequenceNumber;
}

并运行这个序列:

var ob = Generate(
    async () => await ProduceFirst(),
    prev => Continue(prev),
    async prev => await ProduceNext(prev),
    item => item);

我得到了这个结果:

结果

我的测试代码还使用了响应式扩展团队的交互式扩展 - NuGet“Ix-Main”。

于 2015-07-18T03:33:57.643 回答
1

我自己也有类似的问题,也同意以下评论:

我可能违反了反应范式的精神,但这正是我目前所需要的——它不应该继续从队列中提取消息,直到它们可以被处理(至少在不久的将来)。

我相信IAsyncEnumerableIx.NET 比IObservable这种情况更适合——对于这里的问题和任何类似的异步展开功能。原因是每次我们迭代然后从 a 中提取结果时Task,流程控制都由我们(调用者)来拉下一项,或者在满足特定条件时选择不拉下一项。这是 likeIAsyncEnumerable和 not like IObservable,它在我们无法控制速率的情况下将项目推送给我们。

Ix.NET 没有合适的版本,AsyncEnumerable.Generate所以我写了以下内容来解决这个问题。

   public static IAsyncEnumerable<TState> Generate<TState>(TState initialState, Func<TState, bool> condition, Func<TState, Task<TState>> iterate)
    {
        return AsyncEnumerable.CreateEnumerable(() =>
        {
            var started = false;
            var current = default(TState);
            return AsyncEnumerable.CreateEnumerator(async c =>
            {

                if (!started)
                {
                    started = true;
                    var conditionMet = !c.IsCancellationRequested && condition(initialState);
                    if (conditionMet) current = initialState;
                    return conditionMet;
                }
                {
                    var newVal = await iterate(current).ConfigureAwait(false);
                    var conditionMet = !c.IsCancellationRequested && condition(newVal);
                    if (conditionMet) current = newVal;
                    return conditionMet;
                }

            },
                () => current,
                () => { });
        });



    }

笔记:

  • 只有非常轻微的测试。
  • 确实返回初始状态。
  • 不返回第一个不符合条件的 TState,即使它已经完成了获得该结果的工作。可能一个不同的版本可能包括那个。
  • 我宁愿去掉这个condition参数,因为它是一个拉系统,完全取决于调用者是否调用 MoveNext,因此condition 似乎是多余的。它本质TakeWhile上是在函数的结果上添加一个调用。但是,我对 Ix.NET 的研究还不够深入,无法知道是否需要来自的false响应,因此我将其包含在内。MoveNextdisposeIAsyncEnumerator

IAsyncEnumerableIObservable如果需要该特定类型,当然可以转换为。

于 2018-10-25T03:37:27.413 回答
1

这是另一个实现,灵感来自 Enigmativity 的答案。它使用更新的语言特性(C# 7元组解构)。

public static IObservable<TResult> Generate<TResult>(
    Func<Task<TResult>> initialState,
    Func<TResult, bool> condition,
    Func<TResult, Task<TResult>> iterate,
    Func<TResult, TResult> resultSelector,
    IScheduler scheduler = null)
{
    return Observable.Create<TResult>(observer =>
    {
        var (isFirst, current) = (true, default(TResult));
        return Observable
            .While(() => isFirst || condition(current),
                Observable.If(() => isFirst,
                    Observable.FromAsync(ct => initialState()),
                    Observable.FromAsync(ct => iterate(current))
                )
            )
            .Do(x => (isFirst, current) = (false, x))
            .Select(x => resultSelector(x))
            .ObserveOn(scheduler ?? Scheduler.Immediate)
            .Subscribe(observer);
    });
}
于 2020-11-28T20:15:39.957 回答
0

我认为这可能是正确的答案:

这不是一个好的答案。不使用。

我自己创建的Generate支持初始状态的异步/等待+迭代函数:

    public static IObservable<TResult> Generate<TResult>(
        Func<Task<TResult>> initialState,
        Func<TResult, bool> condition,
        Func<TResult, Task<TResult>> iterate,
        Func<TResult, TResult> resultSelector
        )
    {
        return Observable.Create<TResult>(async obs =>
        {
            var state = await initialState();

            while (condition(state))
            {
                var result = resultSelector(state);
                obs.OnNext(result);
                state = await iterate(state);
            }

            obs.OnCompleted();

            return System.Reactive.Disposables.Disposable.Empty;
        });
    }

不幸的是,这似乎有副作用,即消息的产生远远领先于消费。如果观察者处理消息的速度很慢,那么这将在我们处理少量消息之前获取数百万条消息。不完全是我们想要的服务总线。

我将完成上述工作,也许会阅读更多内容,并在需要时发布更具体的问题。

于 2015-07-17T23:19:53.177 回答