6

我正在编写一个函数来检索有关某个主题的新闻并通过 IObservable 返回值反馈此新闻。

但是,我有几个新闻来源。我不想Merge用来将这些来源合并为一个。相反,我想做的是按优先级排序——

  1. 当我的函数被调用时,第一个新闻源被查询(它产生一个代表那个源的 IObservable)。
  2. 如果该新闻源的 IObservable 完成但没有返回任何结果,则查询下一个新闻源。
  3. 如果第二个来源完成但没有返回结果,则查询最终新闻来源。
  4. 这整个行为被包装成一个我可以返回给用户的 observable。

这种行为是我可以使用内置的 Rx 扩展方法完成的,还是我需要实现一个自定义类来处理这个?我将如何做呢?

4

5 回答 5

3

在我看来,接受的答案是不可取的,因为它使用Subject, Do, 并且当第一个序列不为空时仍然订阅第二个序列。如果第二个 observable 调用任何不平凡的东西,后者可能是一个大问题。我想出了以下解决方案:

public static IObservable<T> SwitchIfEmpty<T>(this IObservable<T> @this, IObservable<T> switchTo)
{
    if (@this == null) throw new ArgumentNullException(nameof(@this));
    if (switchTo == null) throw new ArgumentNullException(nameof(switchTo));
    return Observable.Create<T>(obs =>
    {
        var source = @this.Replay(1);
        var switched = source.Any().SelectMany(any => any ? Observable.Empty<T>() : switchTo);
        return new CompositeDisposable(source.Concat(switched).Subscribe(obs), source.Connect());
    });
}

该名称SwitchIfEmpty符合现有的 RxJava 实现是关于将一​​些 RxJava 运算符合并到 RxNET 中的持续讨论。

我确信自定义IObservable实现会比我的更有效。你可以在这里找到由 ReactiveX 成员akarnokd编写的。它也可以在NuGet上使用。

于 2018-08-10T23:47:23.540 回答
1

从原始海报编辑:

我接受了这个答案,但把它变成了一种扩展方法——

/// <summary> Returns the elements of the first sequence, or the values in the second sequence if the first sequence is empty. </summary>
/// <param name="first"> The first sequence. </param>
/// <param name="second"> The second sequence. </param>
/// <typeparam name="T"> The type of elements in the sequence. </typeparam>
/// <returns> The <see cref="IObservable{T}"/> sequence. </returns>
public static IObservable<T> DefaultIfEmpty<T>(this IObservable<T> first, IObservable<T> second)
{
    var signal = new AsyncSubject<Unit>();
    var source1 = first.Do(item => { signal.OnNext(Unit.Default); signal.OnCompleted(); });
    var source2 = second.TakeUntil(signal);

    return source1.Concat(source2); // if source2 is cold, it won't invoke it until source1 is completed
}

原答案:

这可能会奏效。

var signal1 = new AsyncSubject<Unit>();
var signal2 = new AsyncSubject<Unit>();
var source1 = a.Do(item => { signal1.onNext(Unit.Default); signal1.onCompleted(); });
var source2 = b.Do(item => { signal2.onNext(Unit.Default); signal2.onCompleted(); })).TakeUntil(signal1);
var source3 = c.TakeUntil(signal2.Merge(signal1));

return Observable.Concat(source1, source2, source3);

编辑:哎呀,第二个源需要一个单独的信号,第三个不需要发出任何信号。Edit2:哎呀......类型。我习惯了 RxJs :)

PS 还有更少的 RX-y 方法可以做到这一点,可能会少一点打字:

var gotResult = false;
var source1 = a();
var source2 = Observable.Defer(() => return gotResult ? Observable.Empty<T>() : b());
var source3 = Observable.Defer(() => return gotResult ? Observable.Empty<T>() : c());
return Observable.Concat(source1, source2, source3).Do(_ => gotResult = true;);
于 2013-03-04T20:03:19.597 回答
1

听起来您可以只使用一个普通的Amb查询。

编辑:根据评论,Amb不会这样做 - 试一试:

public static IObservable<T> SwitchIfEmpty<T>(
     this IObservable<T> first, 
     Func<IObservable<T>> second)
{
    return first.IsEmpty().FirstOrDefault() ? second() : first;
}

试验台:

static Random r = new Random();
public IObservable<string> GetSource(string sourceName)
{
    Console.WriteLine("Source {0} invoked", sourceName);
    return r.Next(0, 10) < 5 
        ? Observable.Empty<string>() 
        : Observable.Return("Article from " + sourceName);
}

void Main()
{
    var query = GetSource("A")
        .SwitchIfEmpty(() => GetSource("B"))
        .SwitchIfEmpty(() => GetSource("C"));

    using(query.Subscribe(Console.WriteLine))
    {
        Console.ReadLine();
    }           
}

一些示例运行:

Source A invoked
Article from A

Source A invoked
Source B invoked
Article from B

Source A invoked
Source B invoked
Source C invoked
Article from C

编辑编辑:

我想您也可以将其概括为:

public static IObservable<T> SwitchIf<T>(
    this IObservable<T> first, 
    Func<IObservable<T>, IObservable<bool>> predicate, 
    Func<IObservable<T>> second)
{
    return predicate(first).FirstOrDefault() 
        ? second() 
        : first;
}
于 2013-03-04T20:21:12.497 回答
1

另一种方法 - 与其他方法的差异相当大,所以我会提出一个新的答案:

这里有各种有趣的调试行:

public static IObservable<T> FirstWithValues<T>(this IEnumerable<IObservable<T>> sources)
{
    return Observable.Create<T>(obs =>
    {
        // these are neat - if you set it's .Disposable field, and it already
        // had one in there, it'll auto-dispose it
        SerialDisposable disp = new SerialDisposable();
        // this will trigger our exit condition
        bool hadValues = false;
        // start on the first source (assumed to be in order of importance)
        var sourceWalker = sources.GetEnumerator();
        sourceWalker.MoveNext();

        IObserver<T> checker = null;
        checker = Observer.Create<T>(v => 
            {
                // Hey, we got a value - pass to the "real" observer and note we 
                // got values on the current source
                Console.WriteLine("Got value on source:" + v.ToString());
                hadValues = true;
                obs.OnNext(v);
            },
            ex => {
                // pass any errors immediately back to the real observer
                Console.WriteLine("Error on source, passing to observer");
                obs.OnError(ex);
            },
            () => {
                // A source completed; if it generated any values, we're done;                    
                if(hadValues)
                {
                    Console.WriteLine("Source completed, had values, so ending");
                    obs.OnCompleted();
                }
                // Otherwise, we need to check the next source in line...
                else
                {
                    Console.WriteLine("Source completed, no values, so moving to next source");
                    sourceWalker.MoveNext();
                    disp.Disposable = sourceWalker.Current.Subscribe(checker);
                }
            });
        // kick it off by subscribing our..."walker?" to the first source
        disp.Disposable = sourceWalker.Current.Subscribe(checker);
        return disp.Disposable;
    });
}

用法:

var query = new[]
{
    Observable.Defer(() => GetSource("A")), 
    Observable.Defer(() => GetSource("B")), 
    Observable.Defer(() => GetSource("C")), 
}.FirstWithValues();

输出:

Source A invoked
Got value on source:Article from A
Article from A
Source completed, had values, so ending

Source A invoked
Source completed, no values, so moving to next source
Source B invoked
Got value on source:Article from B
Article from B
Source completed, had values, so ending

Source A invoked
Source completed, no values, so moving to next source
Source B invoked
Source completed, no values, so moving to next source
Source C invoked
Got value on source:Article from C
Article from C
Source completed, had values, so ending
于 2013-03-04T23:07:35.833 回答
0

这是 JerKimball 运算符的非阻塞版本SwitchIfEmpty

/// <summary>Returns the elements of the first sequence, or the elements of the
/// second sequence if the first sequence is empty.</summary>
public static IObservable<T> SwitchIfEmpty<T>(this IObservable<T> first,
    IObservable<T> second)
{
    return Observable.Defer(() =>
    {
        bool isEmpty = true;
        return first
            .Do(_ => isEmpty = false)
            .Concat(Observable.If(() => isEmpty, second));
    });
}

这是同一运算符的一个版本,它接受多个序列,并返回第一个非空序列的元素:

/// <summary>Returns the elements of the first non-empty sequence.</summary>
public static IObservable<T> SwitchIfEmpty<T>(params IObservable<T>[] sequences)
{
    return Observable.Defer(() =>
    {
        bool isEmpty = true;
        return sequences
            .Select(s => s.Do(_ => isEmpty = false))
            .Select(s => Observable.If(() => isEmpty, s))
            .Concat();
    });
}

Observable.Defer运算符用于防止多个订阅共享相同的bool isEmpty状态(有关此的更多信息here)。

于 2020-12-05T09:43:01.357 回答