我正在构建一个类以使用并行循环来访问消息队列中的消息,为了解释我的问题,我创建了一个简化版本的代码:
public class Worker
{
private IMessageQueue mq;
public Worker(IMessageQueue mq)
{
this.mq = mq;
}
public int Concurrency
{
get
{
return 5;
}
}
public void DoWork()
{
int totalFoundMessage = 0;
do
{
// reset for every loop
totalFoundMessage = 0;
Parallel.For<int>(
0,
this.Concurrency,
() => 0,
(i, loopState, localState) =>
{
Message data = this.mq.GetFromMessageQueue("MessageQueueName");
if (data != null)
{
return localState + 1;
}
else
{
return localState + 0;
}
},
localState =>
{
Interlocked.Add(ref totalFoundMessage, localState);
});
}
while (totalFoundMessage >= this.Concurrency);
}
}
这个想法是为工人类设置一个并发值来控制并行循环。如果在每个循环之后从消息队列中检索的消息数等于并发数,我假设队列中可能有更多消息并继续从队列中获取,直到消息数小于并发数。TPL 代码也受到TPL 数据并行问题帖子的启发。
我有消息队列和消息对象的接口。
public interface IMessageQueue
{
Message GetFromMessageQueue(string queueName);
}
public class Message
{
}
因此,我创建了单元测试代码并使用 Moq 来模拟IMessageQueue
界面
[TestMethod()]
public void DoWorkTest()
{
Mock<IMessageQueue> mqMock = new Mock<IMessageQueue>();
Message data = new Message();
Worker w = new Worker(mqMock.Object);
int callCounter = 0;
int messageNumber = 11;
mqMock.Setup(x => x.GetFromMessageQueue("MessageQueueName")).Returns(() =>
{
callCounter++;
if (callCounter < messageNumber)
{
return data;
}
else
{
// simulate MSMQ's behavior last call to empty queue returns null
return (Message)null;
}
}
);
w.DoWork();
int expectedCallTimes = w.Concurrency * (messageNumber / w.Concurrency);
if (messageNumber % w.Concurrency > 0)
{
expectedCallTimes += w.Concurrency;
}
mqMock.Verify(x => x.GetFromMessageQueue("MessageQueueName"), Times.Exactly(expectedCallTimes));
}
我使用Moq 的想法来设置基于调用时间的函数返回来设置基于调用时间的响应。
在单元测试过程中,我注意到测试结果不稳定,如果你多次运行它,大多数情况下你会看到测试通过,但偶尔会因为各种原因测试失败。
我不知道是什么导致了这种情况,并寻求您的一些意见。谢谢