0

我正在构建一个类以使用并行循环来访问消息队列中的消息,为了解释我的问题,我创建了一个简化版本的代码:

public class Worker
{
    private IMessageQueue mq;
    public Worker(IMessageQueue mq)
    {
        this.mq = mq;
    }

    public int Concurrency
    {
        get
        {
            return 5;
        }
    }

    public void DoWork()
    {
        int totalFoundMessage = 0;

        do
        {
            // reset for every loop
            totalFoundMessage = 0;

            Parallel.For<int>(
                0,
                this.Concurrency,
                () => 0,
                (i, loopState, localState) =>
                {
                    Message data = this.mq.GetFromMessageQueue("MessageQueueName");

                    if (data != null)
                    {
                        return localState + 1;
                    }
                    else
                    {
                        return localState + 0;
                    }
                },
                localState =>
                {
                    Interlocked.Add(ref totalFoundMessage, localState);
                });
        }
        while (totalFoundMessage >= this.Concurrency);
    }
}

这个想法是为工人类设置一个并发值来控制并行循环。如果在每个循环之后从消息队列中检索的消息数等于并发数,我假设队列中可能有更多消息并继续从队列中获取,直到消息数小于并发数。TPL 代码也受到TPL 数据并行问题帖子的启发。

我有消息队列和消息对象的接口。

public interface IMessageQueue
{
    Message GetFromMessageQueue(string queueName);
}

public class Message
{
}

因此,我创建了单元测试代码并使用 Moq 来模拟IMessageQueue界面

    [TestMethod()]
    public void DoWorkTest()
    {
        Mock<IMessageQueue> mqMock = new Mock<IMessageQueue>();

        Message data = new Message();

        Worker w = new Worker(mqMock.Object);

        int callCounter = 0;
        int messageNumber = 11;
        mqMock.Setup(x => x.GetFromMessageQueue("MessageQueueName")).Returns(() =>
        {
            callCounter++;
            if (callCounter < messageNumber)
            {
                return data;
            }
            else
            {
                // simulate MSMQ's behavior last call to empty queue returns null
                return (Message)null;
            }
        }
        );

        w.DoWork();

        int expectedCallTimes = w.Concurrency * (messageNumber / w.Concurrency);
        if (messageNumber % w.Concurrency > 0)
        {
            expectedCallTimes += w.Concurrency;
        }

        mqMock.Verify(x => x.GetFromMessageQueue("MessageQueueName"), Times.Exactly(expectedCallTimes));
    }

我使用Moq 的想法来设置基于调用时间的函数返回来设置基于调用时间的响应。

在单元测试过程中,我注意到测试结果不稳定,如果你多次运行它,大多数情况下你会看到测试通过,但偶尔会因为各种原因测试失败。

我不知道是什么导致了这种情况,并寻求您的一些意见。谢谢

4

2 回答 2

1

问题是您的GetFromMessageQueue()模拟不是线程安全的,但是您同时从多个线程调用它。++本质上是线程不安全的操作。

相反,您应该使用锁定或Interlocked.Increment().

此外,在您的代码中,您可能不会从并行性中受益,因为启动和停止Parallel.ForEach()会产生一些开销。更好的方法是在 内部有一个while(或do- whileParallel.ForEach(),而不是相反。

于 2013-02-16T13:30:24.253 回答
0

我的方法是重组。在测试诸如计时或并发之类的事情时,通常谨慎的做法是将您的调用(在本例中是使用 PLINQ)抽象为一个接受多个委托的单独类。然后,您可以测试对新类的正确调用。然后,因为新类要简单得多(只有一个 PLINQ 调用)并且不包含任何逻辑,所以您可以不对其进行测试。

我主张不要在这种情况下进行测试,因为除非你正在研究一些超级关键的东西(生命支持系统、飞机等),否则它会变得比值得测试的麻烦更多。相信框架会按预期执行 PLINQ 查询。您应该只测试那些对测试有意义并且为您的项目或客户提供价值的东西。

于 2013-02-15T17:34:35.243 回答