1

我必须确保完成一个 Observable。它正在观察流数据,这不是我要修改的重点。流数据可能根本不打勾。我遇到了一个主题成员,我可以用它来手动完成 observable。

private readonly Subject<bool> _mySubject = new Subject<bool>();

// relevant property that should be completed
public IObservable<myModel> StreamObservable { get; private set; }

IObservable<myModel> someDataObservable = ... ; // may not tick in scenario

// this doesn't work            
StreamObservable = someDataObservable
                .TakeUntil(_mySubject)
                .Finally(() => logger.LogInformation("completed!"));
_mySubject.OnNext(true);

// I tried Amb.. still not going into Finally(), also when someDataObservable doesn't tick
StreamObservable = Observable.Amb(_mySubject.SelectMany(value => Observable.Empty<myModel>()), someDataObservable)
                .Finally(() => logger.LogInformation("completed!"));
_mySubject.OnNext(true);

我怎样才能完成,最好使用 Subject.OnNext() ?

主题触发器只是一个想法。我的偏好是留在观察世界并将序列转换为 Observable.Empty(),例如当调用这样的回调时:

private Task<bool> FinishStreamCallback()
{
    _mySubject.OnNext(true); // my first approach
    return Task.FromResult(true);
}

..targeting 使用 OnCompleted()-订阅另一个类中的相关公共 observable。

4

1 回答 1

1

使用这样的主题并不会结束可观察的内容。它不像 observable 的来源叫做OnCompleted. UsingTakeUntil导致源 observable 的订阅被释放。

就像这样的代码:

var source = new Subject<int>();
var subscription = source.Materialize().Subscribe(x => Console.WriteLine(x.Kind));
source.OnNext(1);
subscription.Dispose();

这仅显示OnNext.

如果我运行这个:

var source = Observable.Return(42);
var subscription = source.Materialize().Subscribe(x => Console.WriteLine(x.Kind));

然后我得到:

OnNext
已完成

.Dispose()显式或通过调用TakeUntil会导致源 observable 停止在它所在的位置。没有OnCompleted产生。

如果您想知道何时取消订阅 observable,请尝试以下扩展方法:

public static IObservable<T> OnUnsubscribe<T>(this IObservable<T> source, Action unsubscribe) =>
    Observable
        .Create<T>(o =>
            new CompositeDisposable(
                source.Subscribe(o),
                Disposable.Create(unsubscribe)));
                

尝试像这样使用它:

var source = new Subject<int>();
var trigger = new Subject<Unit>();
var subscription =
    source
        .Materialize()
        .OnUnsubscribe(() => Console.WriteLine("Unsubscribed!"))
        .TakeUntil(trigger)
        .Subscribe(x => Console.WriteLine(x.Kind));
        
source.OnNext(1);
trigger.OnNext(Unit.Default);

这给了我:

OnNext
退订!
于 2020-06-29T01:36:35.507 回答