5

我想知道是否可以优化以下代码以更快地执行。我目前似乎在一个非常简单的数据流结构上每秒最多可以处理 140 万条简单消息。我知道这个示例过程同步传递/转换消息,但是,我目前测试 TPL Dataflow 作为我自己的基于任务和并发集合的自定义解决方案的可能替代品。我知道术语“并发”已经建议我并行运行,但为了当前的测试目的,我通过同步在自己的解决方案上推送消息,每秒大约有 510 万条消息。我在这里缺少什么,我读到 TPL 数据流被推为高吞吐量、低延迟的解决方案,但到目前为止我必须忽略性能调整。谁能指出我正确的方向?

class TPLDataFlowExperiments
{
    public TPLDataFlowExperiments()
    {
        var buf1 = new BufferBlock<int>();

        var transform = new TransformBlock<int, string>(t =>
            {
                return "";
            });

        var action = new ActionBlock<string>(s =>
            {
                //Thread.Sleep(100);
                //Console.WriteLine(s);
            });

        buf1.LinkTo(transform);
        transform.LinkTo(action);

        //Propagate all Completions down the flow
        buf1.Completion.ContinueWith(t =>
        {
            transform.Complete();
            transform.Completion.ContinueWith(u =>
            {
                action.Complete();
            });
        });

        Stopwatch watch = new Stopwatch();
        watch.Start();

        int cap = 10000000;
        for (int i = 0; i < cap; i++)
        {
            buf1.Post(i);
        }

        //Mark Buffer as Complete
        buf1.Complete();

        action.Completion.ContinueWith(t =>
            {
                watch.Stop();

                Console.WriteLine("All Blocks finished processing");
                Console.WriteLine("Units processed per second: " + cap / watch.ElapsedMilliseconds * 1000);
            });

        Console.ReadLine();
    }
}
4

4 回答 4

9

我认为这主要归结为一件事:您的测试几乎毫无意义。所有这些块都应该做一些事情,并使用多个内核和异步操作来做到这一点。

此外,在您的测试中,可能会花费大量时间进行同步。使用更真实的代码,代码将需要一些时间来执行,因此争用会更少,因此实际开销会比您测量的要小。

但是要真正回答您的问题,是的,您忽略了一些性能调整。具体来说,SingleProducerConstrained,这意味着可以使用具有较少锁定的数据结构。如果我在两个块上都使用它(BufferBlock这里完全没用,你可以安全地删除它),我的计算机上的速率从每秒大约 3-4 百万个项目提高到超过 500 万个。

于 2012-06-22T13:07:52.220 回答
2

为了增加 svick 的答案,该测试仅对单个操作块使用单个处理线程。这样,它只测试使用块的开销。

DataFlow 的工作方式类似于 F# 代理、Scala 角色和 MPI 实现。每个动作块一次执行一个任务,监听输入并产生输出。加速是通过分步分解算法来提供的,该算法可以在多个内核上独立执行,只相互传递消息。

虽然您可以增加并发任务的数量,但最重要的问题是设计一个独立于其他步骤执行最大数量步骤的流程。

于 2012-06-22T14:21:52.123 回答
0

您还可以增加数据流块的并行度。这可能会提供额外的加速,并且如果您发现其中一个块充当其余块的瓶颈,还可以帮助线性任务之间的负载平衡。

于 2013-12-03T21:17:25.660 回答
0

如果您的工作负载非常精细,以至于您希望每秒处理数百万条消息,那么由于相关的开销,通过管道传递单个消息变得不可行。您需要通过将消息批处理到数组或列表来分块化工作负载。例如:

var transform = new TransformBlock<int[], string[]>(batch =>
{
    var results = new string[batch.Length];
    for (int i = 0; i < batch.Length; i++)
    {
        results[i] = ProcessItem(batch[i]);
    }
    return results;
});

对于批量输入,您可以使用System.InteractiveBatchBlock包中的或“linqy”Buffer扩展方法,或包中的类似功能方法,或者手动执行。BatchMoreLinq

于 2020-06-11T11:42:03.357 回答