1

我使用 Switch 语句的 Rx 订阅有问题。

_performSearchSubject
            .AsObservable()
            .Select(_ => PerformQuery())
            .Switch()
            .ObserveOn(_synchronizationContextService.SynchronizationContext)
            .Subscribe(DataArrivedForPositions, PositionQueryError, PositionQueryCompleted)
            .DisposeWith(this);

流程是:

  1. 一些属性发生变化并且调用 performSearchSubject.OnNext
  2. 调用 PerformPositionQuery(),每次命中都会返回一个观察者
  3. 通过此观察者响应的服务在数据接收完成时调用 OnNext 两次和 OnCompleted 一次
  4. 方法 DataArrivedForPositions 被按预期调用两次
  5. 永远不会调用方法 PositionQueryCompleted,尽管在我的数据服务中调用了observer.OnCompleted()。

数据服务的代码是:

        protected override void Request(Request request, IObserver<Response> observer)
        {
            query.Arrive += p => QueryReceive(request.RequestId, p, observer, query);
            query.Error += (type, s, message) => QueryError(observer, message);
            query.NoMoreData += id => QueryCompleted(observer);

            query.Execute(request);
        }

        private void QueryError(IObserver<PositionSheetResponse> observer, string message)
        {
            observer.OnError(new Exception(message));
        }

        private void QueryCompleted(IObserver<PositionSheetResponse> observer)
        {
            observer.OnCompleted();
        }

        private void QueryReceive(Guid requestId, Qry0079Receive receiveData, IObserver<PositionSheetResponse> observer, IQry0079PositionSheet query)
        {
            observer.OnNext(ConvertToResponse(requestId, receiveData));
        }
4

1 回答 1

3

Switch结果只会在您的外部可观察对象 ( _performSearchSubject) 完成时完成。我假设在你的情况下这个永远不会(它可能绑定到执行搜索的用户操作)。

不清楚的是您预计何时PositionQueryCompleted会被调用。如果在处理完每个成功的查询之后,则需要修改您的流,因为Switch您丢失了查询流完成的信息,但它也缺少有关 UI 的信息(甚至错误的调度程序)来说明其数据是否实际处理。

可能有其他方法可以实现它,但基本上你希望你的查询流完整地存活下来Switch目前忽略这个事件)。例如,您可以将您的查询流转换为具有 n+1 个事件,并额外增加一个事件:

    _performSearchSubject
        .AsObservable()
        .Select(_ => 
                  PerformQuery()
                  .Select(Data => new { Data, Complete = false})
                  .Concat(Observable.Return(new { Data = (string)null, Complete = true })))

您可以安全地申请.Switch().ObserveOn(_synchronizationContextService.SynchronizationContext)它,但随后您需要修改您的订阅:

    .Subscribe(data => {
        if (data.Complete) DataArrivedForPositions(data.Data);
        else PositionQueryCompleted()
    }, PositionQueryError)
于 2015-12-28T15:40:39.617 回答