2

我正在尝试使用 CCR 迭代器作为解决需要并行处理大量数据馈送的任务的解决方案,其中来自每个馈送的数据需要按顺序处理。没有一个提要相互依赖,因此可以按每个提要并行处理有序处理。

下面是一个带有一个整数馈送的快速而肮脏的模型,它只是以大约 1.5K/秒的速率将整数推入端口,然后使用 CCR 迭代器将它们拉出以保持按顺序处理的保证。

class Program
{
    static Dispatcher dispatcher = new Dispatcher();
    static DispatcherQueue dispatcherQueue = 
       new DispatcherQueue("DefaultDispatcherQueue", dispatcher);
    static Port<int> intPort = new Port<int>();

    static void Main(string[] args)
    {
        Arbiter.Activate(
            dispatcherQueue,
            Arbiter.FromIteratorHandler(new IteratorHandler(ProcessInts)));

        int counter = 0;
        Timer t = new Timer( (x) => 
            { for(int i = 0; i < 1500; ++i) intPort.Post(counter++);}
              , null, 0, 1000);

        Console.ReadKey();
    }

    public static IEnumerator<ITask> ProcessInts()
    {
        while (true)
        {
            yield return intPort.Receive();
            int currentValue;
            if( (currentValue = intPort) % 1000 == 0)
            {
                Console.WriteLine("{0}, Current Items In Queue:{1}", 
                  currentValue, intPort.ItemCount);
            }
        }
    }
}

令我非常惊讶的是,CCR 无法跟上 Corei7 机器,队列大小无限增长。在另一项测试中,在负载或 ~100 Post/sec. 下测量从 Post() 到 Receive() 的延迟,每批中第一个 Post() 和 Receive() 之间的延迟约为 1ms。

我的模型有问题吗?如果是这样,使用 CCR 执行此操作的更好方法是什么?

4

1 回答 1

1

是的,我同意,这确实看起来很奇怪。您的代码最初似乎运行平稳,但在几千个项目之后,处理器使用率上升到性能确实乏善可陈的地步。这让我感到不安,并暗示了框架中的一个问题。在玩了你的代码之后,我真的无法确定为什么会这样。我建议将此问题带到Microsoft 机器人论坛,看看您是否可以让 George Chrysanthakopoulos(或其他 CCR 大脑之一)告诉您问题所在。但是,我可以推测您的代码效率非常低。

您处理从端口“弹出”项目的方式非常低效。本质上,每当端口中有一条消息时,迭代器就会被唤醒,它只处理一条消息(尽管端口中可能还有数百条消息),然后在将yield控制权传递回框架时挂起。在产生的接收器导致迭代器再次“唤醒”时,许多消息已经填满了端口。从 Dispatcher 中拉出一个线程来处理单个项目(同时许多项目已经堆积起来)几乎肯定不是获得良好吞吐量的最佳方式。

我已经修改了您的代码,以便在屈服之后,我们检查端口以查看是否有任何进一步的消息排队并处理它们,从而在我们屈服回框架之前完全清空端口。我还对您的代码进行了一些重构,以CcrServiceBase简化您正在执行的某些任务的语法:

internal class Test:CcrServiceBase
{
    private readonly Port<int> intPort = new Port<int>();
    private Timer timer;
    public Test() : base(new DispatcherQueue("DefaultDispatcherQueue",
                                             new Dispatcher(0,
                                                            "dispatcher")))
    {

    }

    public void StartTest() {
        SpawnIterator(ProcessInts);
        var counter = 0;
        timer = new Timer(x =>
                          {
                              for (var i = 0; i < 1500; ++i)
                                  intPort.Post(counter++);
                          }
                          ,
                          null,
                          0,
                          1000);
    }

    public IEnumerator<ITask> ProcessInts()
    {
        while (true)
        {
            yield return intPort.Receive();
            int currentValue = intPort;
            ReportCurrent(currentValue);
            while(intPort.Test(out currentValue))
            {
                ReportCurrent(currentValue);
            }
        }
    }

    private void ReportCurrent(int currentValue)
    {
        if (currentValue % 1000 == 0)
        {
            Console.WriteLine("{0}, Current Items In Queue:{1}",
                              currentValue,
                              intPort.ItemCount);
        }
    }
}

或者,您可以完全取消迭代器,因为它在您的示例中并没有很好地使用(尽管我不完全确定这对处理顺序有什么影响):

internal class Test : CcrServiceBase
{
    private readonly Port<int> intPort = new Port<int>();
    private Timer timer;

    public Test() : base(new DispatcherQueue("DefaultDispatcherQueue",
                                             new Dispatcher(0,
                                                            "dispatcher")))
    {

    }

    public void StartTest()
    {
        Activate(
            Arbiter.Receive(true,
                            intPort,
                            i =>
                            {
                                ReportCurrent(i);
                                int currentValue;
                                while (intPort.Test(out currentValue))
                                {
                                    ReportCurrent(currentValue);
                                }
                            }));
        var counter = 0;
        timer = new Timer(x =>
                          {
                              for (var i = 0; i < 500000; ++i)
                              {
                                  intPort.Post(counter++);
                              }
                          }
                          ,
                          null,
                          0,
                          1000);
    }



    private void ReportCurrent(int currentValue)
    {
        if (currentValue % 1000000 == 0)
        {
            Console.WriteLine("{0}, Current Items In Queue:{1}",
                              currentValue,
                              intPort.ItemCount);
        }
    }
}

这两个示例都将吞吐量显着提高了几个数量级。希望这可以帮助。

于 2011-04-06T22:16:58.460 回答