我使用 Switch 语句的 Rx 订阅有问题。
_performSearchSubject
.AsObservable()
.Select(_ => PerformQuery())
.Switch()
.ObserveOn(_synchronizationContextService.SynchronizationContext)
.Subscribe(DataArrivedForPositions, PositionQueryError, PositionQueryCompleted)
.DisposeWith(this);
流程是:
- 一些属性发生变化并且调用 performSearchSubject.OnNext
- 调用 PerformPositionQuery(),每次命中都会返回一个观察者
- 通过此观察者响应的服务在数据接收完成时调用 OnNext 两次和 OnCompleted 一次
- 方法 DataArrivedForPositions 被按预期调用两次
- 永远不会调用方法 PositionQueryCompleted,尽管在我的数据服务中调用了observer.OnCompleted()。
数据服务的代码是:
protected override void Request(Request request, IObserver<Response> observer)
{
query.Arrive += p => QueryReceive(request.RequestId, p, observer, query);
query.Error += (type, s, message) => QueryError(observer, message);
query.NoMoreData += id => QueryCompleted(observer);
query.Execute(request);
}
private void QueryError(IObserver<PositionSheetResponse> observer, string message)
{
observer.OnError(new Exception(message));
}
private void QueryCompleted(IObserver<PositionSheetResponse> observer)
{
observer.OnCompleted();
}
private void QueryReceive(Guid requestId, Qry0079Receive receiveData, IObserver<PositionSheetResponse> observer, IQry0079PositionSheet query)
{
observer.OnNext(ConvertToResponse(requestId, receiveData));
}