2

我在访问相同 BlockingCollection 的 C# 应用程序中使用两个线程。这很好用,但我想两次检索第一个值,以便两个线程检索相同的值 *。

几秒钟后,我想轮询两个线程的 currentIndex 并删除每个 value < index. 例如,线程的最低 currentIndex 为 5,应用程序删除队列中索引 0 -5 处的项目。另一种解决方案是如果所有线程都处理了该值,则删除队列中的值。

我怎样才能做到这一点?我想我需要另一种类型的缓冲区..?

先感谢您!

*如果thread1调用了.Take(),则该item在collection中被移除,thread2无法再次获取相同的item。


更新:

我想将数据存储在缓冲区中,例如 thread1 将数据保存到 HDD,thread2 分析(相同)数据(并发)。

4

2 回答 2

5

使用生产者-消费者将 Value1 添加到两个单独的 ConcurrentQueue。让线程出队,然后从自己的队列中处理它们。

2014 年 7 月 4 日编辑:这是一个模糊、笨拙且经过深思熟虑的解决方案:创建一个缓冲的自定义对象。它可能包含您尝试在线程 1 中缓冲的信息和线程 2 中的分析结果的空间。

将对象添加到线程 1 中的缓冲区和 BlockingCollection。使用线程 2 分析结果并使用结果更新对象。阻塞集合不应该变得太大,因为它只处理引用不应该影响你的记忆。这假设您不会同时在两个线程上修改缓冲区中的信息。

另一个也是经过深思熟虑的解决方案是将信息同时输入缓冲区和阻塞集合。分析来自 BlockingCollection 的数据,将其输入输出集合并再次将它们与缓冲区匹配。如果操作正确,此选项可以处理并发修改,但可能需要更多工作。

我认为选项一更好。正如我所指出的,这些只是半成品,但它们可能会帮助您找到适合您特定需求的东西。祝你好运。

于 2014-04-03T13:47:56.690 回答
1

我建议重新考虑你的设计。

当您有一个必须处理的项目列表时,然后给每个线程一个他必须处理的项目队列。

使用这样的解决方案,为两个或多个线程提供相同的处理值不会有问题。

像这样的东西,没有测试只是打字。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Collections.Concurrent;

namespace ConsoleApplication2
{

  class Item
  {
    private int _value;
    public int Value
    {
      get
      {
        return _value;
      }
    }

    // all you need
    public Item(int i)
    {
      _value = i;
    }
  }

  class WorkerParameters
  {
    public ConcurrentQueue<Item> Items = new ConcurrentQueue<Item>();
  }

  class Worker
  {
    private Thread _thread;
    private WorkerParameters _params = new WorkerParameters();

    public void EnqueueItem(Item item)
    {
      _params.Items.Enqueue(item);
    }

    public void Start()
    {
      _thread = new Thread(new ParameterizedThreadStart(ThreadProc));
      _thread.Start();
    }

    public void Stop()
    {
      // build somthing to stop your thread
    }

    public static void ThreadProc(object threadParams)
    {
      WorkerParameters p = (WorkerParameters)threadParams;
      while (true)
      {
        while (p.Items.Count > 0)
        {
          Item item = null;
          p.Items.TryDequeue(out item);

          if (item != null)
          {
            // do something
          }

        }
        System.Threading.Thread.Sleep(50);
      }
    }
  }

  class Program
  {
    static void Main(string[] args)
    {

      Worker w1 = new Worker();
      Worker w2 = new Worker();
      w1.Start();
      w2.Start();

      List<Item> itemsToProcess = new List<Item>();
      for (int i = 1; i < 1000; i++)
      {
        itemsToProcess.Add(new Item(i));
      }

      for (int i = 1; i < 1000; i++)
      {
        w1.EnqueueItem(itemsToProcess[i]);
        w2.EnqueueItem(itemsToProcess[i]);
      }


    }
  }
}
于 2014-04-08T15:29:02.860 回答