0

这是我的场景。我从外部数据源获取大量数据,我必须在两个地方本地写入。其中一个目的地的写入速度非常慢,但另一个目的地非常快(但我不能依靠它来读取和写入缓慢的目的地)。为此,我使用了生产者-消费者模式(使用 BlockingCollection)。

我现在遇到的问题是我必须在两个 BlockingCollection 中对数据进行排队,这会占用太多内存。我的代码看起来与下面的示例非常相似,但我真的很想从一个队列中驱动两个任务。有谁知道这样做的正确方法是什么?下面的代码效率低下吗?

class Program
{
    const int MaxNumberOfWorkItems = 15;
    static BlockingCollection<int> slowBC = new BlockingCollection<int>(MaxNumberOfWorkItems);
    static BlockingCollection<int> fastBC = new BlockingCollection<int>(MaxNumberOfWorkItems);

    static void Main(string[] args)
    {
        Task slowTask = Task.Factory.StartNew(() =>
        {
            foreach (var item in slowBC.GetConsumingEnumerable())
            {
                Console.WriteLine("SLOW -> " + item);
                Thread.Sleep(25);
            }
        });

        Task fastTask = Task.Factory.StartNew(() =>
        {
            foreach (var item in fastBC.GetConsumingEnumerable())
            {
                Console.WriteLine("FAST -> " + item);
            }
        });

        // Population two BlockingCollections with the same data. How can I have a single collection?
        for (int i = 0; i < 100; i++)
        {
            while (slowBC.TryAdd(i) == false)
            {
                Console.WriteLine("Wait for slowBC...");
            }

            while (fastBC.TryAdd(i) == false)
            {
                Console.WriteLine("Wait for 2...");
            }
        }

        slowBC.CompleteAdding();
        fastBC.CompleteAdding();

        Task.WaitAll(slowTask, fastTask);

        Console.ReadLine();
    }
}
4

1 回答 1

0
  1. 使用生产者-消费者队列来传输单个整数是非常低效的。你是在块中接收它,所以为什么不将队列输入为 '*chunk' 并发送整个块,立即在同一个 ref 处创建/分离一个新块。rx 的变量。下一批数据?这就是 PC 队列通常用于处理大量数据的方式 - 排队 refs/pointers,而不是实际数据。线程具有共享内存空间,(一些开发人员似乎认为这只会导致问题),所以使用它 - 队列指针/引用并将 MB 数据作为一个指针安全地传输。只要您在下一行代码中总是在将旧线程排队后创建/分离一个新线程,生产者和消费者线程就永远不会在同一个块上运行。

    对于大块,排队 *chunks 的效率要高出 10 倍。

  2. 将 *chunks 发送到快速链接,然后从那里将它们“转发”到慢速链接。

  3. 如果慢速链接不会阻塞您的系统并导致最终的 OOM 错误,您可能需要全面的流量控制。我通常做的是修复总缓冲区大小的“整体”配额,并在启动时创建一个块池(池是另一个 BlockingCollection,在启动时填充 *new(chunks))。生产者线程使块出列,用数据填充它们,将它们排队到 FAST 线程。FAST 线程处理接收到的块,然后将 *chunk 排队到 SLOW 线程。SLOW 线程处理相同的数据,然后重新池化“已使用”块以供生产者线程重新使用。这形成了一个流量控制系统——如果 SLOW thred 太慢,生产者最终尝试从一个空池中分离出一个 *chunk 并阻塞在那里,直到 SLOW 线程重新池化一些使用过的 *chunk 并因此向生产者线程发出再次运行的信号。您可能需要在慢速线程中使用一些策略来使其操作超时并提前转储其 *chunk,因此删除数据 - 您必须根据您的总体要求决定一个策略 - 显然不可能将数据连续排队到快速除非缓慢的消费者转储一些数据,否则永远缓慢的消费者不会出现内存溢出。

编辑 - 哦,是的,使用池消除了已用块上的 GC,进一步提高了性能。

一种总体流策略是不在慢线程中转储任何数据。随着持续的高数据流,*chunks 最终将全部位于快速和慢速线程之间的队列中,并且生产者线程确实会阻塞在空池中。然后,网络连接将应用其自己的流控制来停止网络对等方通过 TCP 发送更多数据。这将流控制从你的慢线程一直延伸到对等点。

于 2012-06-29T08:36:42.833 回答