0
AsycSubject<Unit>() sub;
// stuff
if(!sub.HasFired())
  // Do stuff

目前最好的尝试是:

public static bool HasFired<T>(this AsyncSubject<T> sub)
{
    AsyncSubject<bool> ret = new AsyncSubject<bool>();
    sub.Timeout(TimeSpan.FromMilliseconds(20))
        .Subscribe(_ =>
            {
                ret.OnNext(true);
                ret.OnCompleted();
            },
            ex => 
            {
                ret.OnNext(false);
                ret.OnCompleted();
            });
    return ret.First();
}

但是感觉很丑很长。我怀疑我错过了一些简单的东西。有什么建议么?

4

1 回答 1

1

环绕现有AsyncSubject状态并添加所需状态更容易。

public class AsyncSubjectEx<T> : ISubject<T>, IDisposable
{
    AsyncSubject<T> Subject = new AsyncSubject<T>();

    public bool HasValue { get; protected set; }

    public object Gate = new object();

    public void OnCompleted()
    {
        Subject.OnCompleted();
    }

    public void OnError(Exception error)
    {
        Subject.OnError(error);
    }

    public void OnNext(T value)
    {
        lock (Gate)
        {
            Subject.OnNext(value);
            HasValue = true;
        }
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        lock (Gate)
            return Subject.Subscribe(observer);
    }

    public void Dispose()
    {
        Subject.Dispose();
    }
}

具有讽刺意味的是,原件AsyncSubject经过反思表明有一个hasValue场,但它并没有被暴露出来。考虑将此报告给 Rx 团队 - 有时可能会有用。

于 2012-08-11T06:49:38.313 回答