2

我有一个随机失败的单元测试,我无法解释。这涉及使用 Rx.NET 的可观察序列和我为转换序列而制作的扩展方法。首先,让我展示一下测试是如何失败的:

Machine.Specifications.SpecificationException:预期:System.Collections.Generic.List`1 [System.Int32]:
{
  [8],
  [10],
  [11]
}

  但是是:System.Collections.Generic.List`1[System.Int32]:
{
  [8],
  [10],
  [11],
  [8],
  [10],
  [11]
}

好的,你看,我得到整个序列两次而不是一次。这是测试:

[Subject(typeof(ObservableExtensions), "Shutter Current Readings")]
internal class when_a_shutter_current_reading_is_received
    {
    Establish context = () => source = "Z8\nZ10\nZ11\n".ToObservable();
    Because of = () => source
        .ShutterCurrentReadings().Trace("Unbelievable")
        .SubscribeAndWaitForCompletion(item => elementHistory.Add(item));
    It should_receive_the_current_readings = () => elementHistory.ShouldEqual(expectedElements);
    static List<int> elementHistory = new List<int>();
    static List<int> expectedElements = new List<int> {8, 10, 11};
    static IObservable<char> source;

    }

SubscribeAndWaitForCompletion()是一个扩展方法,定义如下:

public static void SubscribeAndWaitForCompletion<T>(this IObservable<T> sequence, Action<T> observer)
    {
    var sequenceComplete = new ManualResetEvent(false);
    var subscription = sequence.Subscribe(
        onNext: observer,
        onCompleted: () => sequenceComplete.Set()
        );
    sequenceComplete.WaitOne();
    subscription.Dispose();
    sequenceComplete.Dispose();
    }

您会注意到那里有一个.Trace()调用,扩展方法内部还有另一个调用,这会通过 NLog 生成有关可观察序列的日志记录,这是跟踪输出:

20:43:43.1547|调试|c__DisplayClass0_1`1|难以置信[1]:订阅()
20:43:43.1547|调试|c__DisplayClass0_1`1|ShutterCurrent[1]:订阅()
20:43:43.1547|DEBUG|c__DisplayClass0_1`1|ShutterCurrent[1]: OnNext(8)
20:43:43.1547|DEBUG|c__DisplayClass0_1`1|难以置信[1]:OnNext(8)
20:43:43.1547|DEBUG|c__DisplayClass0_1`1|ShutterCurrent[1]: OnNext(10)
20:43:43.1547|DEBUG|c__DisplayClass0_1`1|难以置信[1]:OnNext(10)
20:43:43.1547|DEBUG|c__DisplayClass0_1`1|ShutterCurrent[1]: OnNext(11)
20:43:43.1547|DEBUG|c__DisplayClass0_1`1|难以置信[1]:OnNext(11)
20:43:43.1547|DEBUG|c__DisplayClass0_1`1|ShutterCurrent[1]: OnCompleted()
20:43:43.1547|调试|c__DisplayClass0_1`1|难以置信[1]:OnCompleted()
20:43:43.1547|DEBUG|c__DisplayClass0_1`1|难以置信[1]:Dispose()
20:43:43.1547|DEBUG|c__DisplayClass0_1`1|ShutterCurrent[1]: Dispose()
孩子测试失败

这几乎是我所期望的。我从我的扩展方法内部得到一个跟踪输出,然后在扩展方法外部的转换序列上得到另一个。正如预期的那样,序列中的每个元素恰好流过系统一次。然而,我在测试中两次捕获了整个序列。

我最好提供扩展方法,以便我们可以看到它的作用。这里是:

    public static IObservable<int> ShutterCurrentReadings(this IObservable<char> source)
        {
        const string shutterCurrentPattern = @"^Z(?<Current>\d{1,2})[^0-9]";
        var shutterCurrentRegex =
            new Regex(shutterCurrentPattern, RegexOptions.Compiled | RegexOptions.ExplicitCapture);
        var buffers = source.Publish(s => s.BufferByPredicates(p => p == 'Z', q => !char.IsDigit(q)));
        var shutterCurrentValues = from buffer in buffers
                                   let message = new string(buffer.ToArray())
                                   let patternMatch = shutterCurrentRegex.Match(message)
                                   where patternMatch.Success
                                   let shutterCurrent = int.Parse(patternMatch.Groups["Current"].Value)
                                   select shutterCurrent;
        return shutterCurrentValues.Trace("ShutterCurrent");
        }

所以这里的目的是从数据流中挑选出当前传感器的读数。读数采用 Znn 格式(字面“Z”后跟一个或两个十进制数字后跟一个换行符。扩展方法将原始输入字符序列转换为表示当前读数的整数序列。过滤器使用 RxBuffer运算符来缓冲它认为可能是有效的传感器读数的字符。当看到“Z”字符时打开缓冲区,当看到非数字字符时关闭缓冲区。通过在正则表达式中匹配和解析进行双重检查,然后如果result 传递它转换为整数并在输出序列中发出的所有内容。

谁能明白为什么我的结果中可能会出现双重数据?

更新:与调查相关的附加代码。

    public static IObservable<IList<char>> BufferByPredicates(this IObservable<char> source,
        Predicate<char> bufferOpening, Predicate<char> bufferClosing)
        {
        return source.Buffer(source.Where(c => bufferOpening(c)), x => source.Where(c => bufferClosing(c)));
        }

Trace扩展方法可在 NuGet 包TA.ASCOM.ReactiveCommunications(我的一个)中找到,但来源如下:

    public static IObservable<TSource> Trace<TSource>(this IObservable<TSource> source, string name)
        {
        var log = LogManager.GetLogger(name);
        var id = 0;
        return Observable.Create<TSource>(observer =>
            {
            var idClosure = ++id;
            Action<string, object> trace = (m, v) => log.Debug("{0}[{1}]: {2}({3})", name, idClosure, m, v);
            trace("Subscribe", "");
            var disposable = source.Subscribe(
                v =>
                    {
                    trace("OnNext", v);
                    observer.OnNext(v);
                    },
                e =>
                    {
                    trace("OnError", "");
                    observer.OnError(e);
                    },
                () =>
                    {
                    trace("OnCompleted", "");
                    observer.OnCompleted();
                    });
            return () =>
                {
                trace("Dispose", "");
                disposable.Dispose();
                };
            });
        }

我怀疑我可能从其他人那里复制了这段代码,但我似乎没有记下谁。

4

1 回答 1

1

编辑

这是在 LinqPad 中模拟问题的一种方法,无需使用 MSpec/NChrunch (?) 运行器:

void Main()
{
    //static initializers
    List<int> expectedElements = new List<int> { 8, 10, 11 };
    List<int> elementHistory = new List<int>();
    IObservable<char> source;

    //simulated continuous running of MSpec test
    for (int i = 0; i < 20; i++)
    {

        //establish
        source = "Z8\nZ10\nZ11\n".ToObservable();

        //because
        source
            .ShutterCurrentReadings()
            .Trace("Unbelievable")
            .SubscribeAndWaitForCompletion(item => elementHistory.Add(item));

        //it
        elementHistory.Dump(i.ToString()); //Linqpad
        if(elementHistory.Count > 3)
            throw new Exception("Assert.ShouldNotHappen");
    }
}

public static class Extensions
{
    public static IObservable<int> ShutterCurrentReadings(this IObservable<char> source)
    {
        const string shutterCurrentPattern = @"^Z(?<Current>\d{1,2})[^0-9]";
        var shutterCurrentRegex =
            new Regex(shutterCurrentPattern, RegexOptions.Compiled | RegexOptions.ExplicitCapture);
        var buffers = source.Publish(s => s.BufferByPredicates(p => p == 'Z', q => !char.IsDigit(q)));
        var shutterCurrentValues = from buffer in buffers
                                   let message = new string(buffer.ToArray())
                                   let patternMatch = shutterCurrentRegex.Match(message)
                                   where patternMatch.Success
                                   let shutterCurrent = int.Parse(patternMatch.Groups["Current"].Value)
                                   select shutterCurrent;
        return shutterCurrentValues.Trace("ShutterCurrent");
    }

    public static void SubscribeAndWaitForCompletion<T>(this IObservable<T> sequence, Action<T> observer)
    {
        var sequenceComplete = new ManualResetEvent(false);
        var subscription = sequence.Subscribe(
            onNext: observer,
            onCompleted: () => sequenceComplete.Set()
            );
        sequenceComplete.WaitOne();
        subscription.Dispose();
        sequenceComplete.Dispose();
    }

    public static IObservable<TSource> Trace<TSource>(this IObservable<TSource> source, string name)
    {
        var log = LogManager.GetLogger(name);
        var id = 0;
        return Observable.Create<TSource>(observer =>
            {
                var idClosure = ++id;
                Action<string, object> trace = (m, v) => log.Debug("{0}[{1}]: {2}({3})", name, idClosure, m, v);
                trace("Subscribe", "");
                var disposable = source.Subscribe(
                    v =>
                        {
                            trace("OnNext", v);
                            observer.OnNext(v);
                        },
                    e =>
                        {
                            trace("OnError", "");
                            observer.OnError(e);
                        },
                    () =>
                        {
                            trace("OnCompleted", "");
                            observer.OnCompleted();
                        });
                return () =>
                    {
                        trace("Dispose", "");
                        disposable.Dispose();
                    };
            });
    }

    public static IObservable<IList<char>> BufferByPredicates(this IObservable<char> source,
        Predicate<char> bufferOpening, Predicate<char> bufferClosing)
    {
        return source.Buffer(source.Where(c => bufferOpening(c)), x => source.Where(c => bufferClosing(c)));
    }
}

这失败了,就像你的场景一样。

我修复它的最佳建议是将初始化elementHistory移至Establish步骤。您还可以将source变量从建立中移开,因此您的测试将如下所示:

internal class when_a_shutter_current_reading_is_received
{
    Establish context = () => elementHistory = new List<int>();
    Because of = () => "Z8\nZ10\nZ11\n".ToObservable()
        .ShutterCurrentReadings()
        .Trace("Unbelievable")
        .SubscribeAndWaitForCompletion(item => elementHistory.Add(item));
    It should_receive_the_current_readings = () => elementHistory.ShouldEqual(expectedElements);
    static List<int> elementHistory;
    static List<int> expectedElements = new List<int> { 8, 10, 11 };

}

您可能还想查看Microsoft.Reactive.Testing哪个对 Rx 查询提供了更强大的测试,尽管它不会像您的测试那样简单。


老答案:

Trace由于缺少,ShouldEqualBufferByPredicates函数,我无法编译您的代码。如果它们来自外部来源,请记录在哪里。

我猜问题源于BufferByPredicates实施,Trace实施,缺少ConnectafterPublish或 static elementHistory

我最好的猜测是静态的elementHistory:如果该测试同时运行两次,你就有一个竞争条件,你可能会得到双重结果(Establish运行两次,然后Because运行两次,然后It会失败)。


于 2018-03-22T01:07:14.140 回答