我正在尝试使用 CCR 迭代器作为解决需要并行处理大量数据馈送的任务的解决方案,其中来自每个馈送的数据需要按顺序处理。没有一个提要相互依赖,因此可以按每个提要并行处理有序处理。
下面是一个带有一个整数馈送的快速而肮脏的模型,它只是以大约 1.5K/秒的速率将整数推入端口,然后使用 CCR 迭代器将它们拉出以保持按顺序处理的保证。
class Program
{
static Dispatcher dispatcher = new Dispatcher();
static DispatcherQueue dispatcherQueue =
new DispatcherQueue("DefaultDispatcherQueue", dispatcher);
static Port<int> intPort = new Port<int>();
static void Main(string[] args)
{
Arbiter.Activate(
dispatcherQueue,
Arbiter.FromIteratorHandler(new IteratorHandler(ProcessInts)));
int counter = 0;
Timer t = new Timer( (x) =>
{ for(int i = 0; i < 1500; ++i) intPort.Post(counter++);}
, null, 0, 1000);
Console.ReadKey();
}
public static IEnumerator<ITask> ProcessInts()
{
while (true)
{
yield return intPort.Receive();
int currentValue;
if( (currentValue = intPort) % 1000 == 0)
{
Console.WriteLine("{0}, Current Items In Queue:{1}",
currentValue, intPort.ItemCount);
}
}
}
}
令我非常惊讶的是,CCR 无法跟上 Corei7 机器,队列大小无限增长。在另一项测试中,在负载或 ~100 Post/sec. 下测量从 Post() 到 Receive() 的延迟,每批中第一个 Post() 和 Receive() 之间的延迟约为 1ms。
我的模型有问题吗?如果是这样,使用 CCR 执行此操作的更好方法是什么?