8

Observable.TakeWhile 允许您在条件为真时运行序列(使用委托以便我们可以对实际序列对象执行计算),但它会在每个元素之前检查此条件。如何在每个元素之后执行相同的检查?

下面的代码演示了这个问题

    void RunIt()
    {
        List<SomeCommand> listOfCommands = new List<SomeCommand>();
        listOfCommands.Add(new SomeCommand { CurrentIndex = 1, TotalCount = 3 });
        listOfCommands.Add(new SomeCommand { CurrentIndex = 2, TotalCount = 3 });
        listOfCommands.Add(new SomeCommand { CurrentIndex = 3, TotalCount = 3 });

        var obs = listOfCommands.ToObservable().TakeWhile(c => c.CurrentIndex != c.TotalCount);

        obs.Subscribe(x =>
        {
            Debug.WriteLine("{0} of {1}", x.CurrentIndex, x.TotalCount);
        });
    }

    class SomeCommand
    {
        public int CurrentIndex;
        public int TotalCount;
    }

这输出

1 of 3
2 of 3

我无法获得第三个元素

看这个例子,你可能会认为我所要做的就是像这样改变我的条件 -

var obs = listOfCommands.ToObservable().TakeWhile(c => c.CurrentIndex <= c.TotalCount);

但是,observable 永远不会完成(因为在我的真实世界代码中,流不会在这三个命令之后结束)

4

6 回答 6

15

没有内置运算符可以执行您的要求,但这里有一个用于Publish运行两个查询,同时只订阅底层 observable 一次:

// Emits matching values, but includes the value that failed the filter
public static IObservable<T> TakeWhileInclusive<T>(
    this IObservable<T> source, Func<T, bool> predicate)
{
    return source.Publish(co => co.TakeWhile(predicate)
        .Merge(co.SkipWhile(predicate).Take(1)));
}

接着:

var obs = listOfCommands.ToObservable()
    .TakeWhileInclusive(c.CurrentIndex != c.TotalCount);
于 2013-02-05T03:11:10.153 回答
6

最终编辑:

我的解决方案基于此线程中 Sergey 的 TakeWhileInclusive 实现 - 如何根据事件中的条件完成 Rx Observable

public static IObservable<TSource> TakeUntil<TSource>(
        this IObservable<TSource> source, Func<TSource, bool> predicate)
{
    return Observable
        .Create<TSource>(o => source.Subscribe(x =>
        {
            o.OnNext(x);
            if (predicate(x))
                o.OnCompleted();
        },
        o.OnError,
        o.OnCompleted
    ));
}
于 2013-02-12T21:15:14.153 回答
2

您可以使用TakeUntil运算符获取每个项目,直到辅助来源产生值;在这种情况下,我们可以将第二个流指定为谓词通过后的第一个值:

public static IObservable<TSource> TakeWhileInclusive<TSource>(
    this IObservable<TSource> source,
    Func<TSource, bool> predicate)
{
    return source.TakeUntil(source.SkipWhile(x => predicate(x)).Skip(1));
}
于 2013-02-05T12:32:05.817 回答
1

我认为你追求的是TakeWhile,​​而不是TakeUntil

var list = (new List<int>(){1,2,3,4,5,6,7,8,9,10});
var takeWhile = list
        .ToObservable()
        .Select((_, i) => Tuple.Create(i, _))
        .TakeWhile(tup => tup.Item1 < list.Count)
        .Do(_ => Console.WriteLine("Outputting {0}", _.Item2));

好的,您想要的东西不存在开箱即用,至少我不知道具有那种特定语法的东西。也就是说,你可以很容易地把它拼凑起来(而且它不是讨厌):

var fakeCmds = Enumerable
    .Range(1, 100)
    .Select(i => new SomeCommand() {CurrentIndex = i, TotalCount = 10})
    .ToObservable();

var beforeMatch = fakeCmds
    .TakeWhile(c => c.CurrentIndex != c.TotalCount);
var theMatch = fakeCmds
    .SkipWhile(c => c.CurrentIndex != c.TotalCount)
    .TakeWhile(c => c.CurrentIndex == c.TotalCount);
var upToAndIncluding = Observable.Concat(beforeMatch, theMatch);
于 2013-02-05T01:47:30.077 回答
0

组合,使用新的SkipUntiland TakeUntil

跳过直到 return source.Publish(s => s.SkipUntil(s.Where(predicate)));

TakeUntil (含) return source.Publish(s => s.TakeUntil(s.SkipUntil(predicate)));

完整来源https ://gist.github.com/GeorgeTsiokos/a4985b812c4048c428a981468a965a86

于 2017-02-13T20:41:21.757 回答
-1

也许以下方式对某人有用。您必须使用“Do”方法和空的“Subscribe”方法。

    listOfCommands.ToObservable()
    .Do(x =>
    {
        Debug.WriteLine("{0} of {1}", x.CurrentIndex, x.TotalCount);
    })
    .TakeWhile(c => c.CurrentIndex != c.TotalCount)
    .Subscribe();

这样,您无需编写自己的扩展即可获得结果。

于 2020-01-01T13:31:21.190 回答