1

I want two threads to be working with one queue. First thread should be called every 2 seconds, and the second thread - every 3 seconds. Both threads should start at the same time. I have a problem when accessing first element of the queue. Both threads take the element with index 0. Sometimes it happens with other elements of the queue, not only with the first element. I have such an output on Console:

  • Item 0 processed by 1 Time: 3:27:8
  • Item 0 processed by 2 Time: 3:27:8
  • Item 2 processed by 1 Time: 3:27:10
  • Item 3 processed by 2 Time: 3:27:11
  • Item 4 processed by 1 Time: 3:27:12

and so on..

Here is the code I use:

    ConcurrentQueue<int> sharedQueue = new ConcurrentQueue<int>();

    for (int i = 0; i < 10; i++)
    {
        sharedQueue.Enqueue(i);
    }


    int itemCount= 0;


    Task[] tasks = new Task[2];
    for (int i = 0; i < tasks.Length; i++)
    {
        // create the new task
        tasks[i] = new Task(() =>
        {
            while (sharedQueue.Count > 0)
            {
                // define a variable for the dequeue requests
                int queueElement;
                // take an item from the queue
                bool gotElement = sharedQueue.TryDequeue(out queueElement);
                // increment the count of items processed
                if (gotElement)
                {
                    DateTime dt = DateTime.Now;
                    Console.WriteLine("Item " + itemCount + "processed by " 
                        + Task.CurrentId + " Time: " + dt.Hour + ":" + dt.Minute + ":" + dt.Second);
                    Interlocked.Increment(ref itemCount);   
               if (Task.CurrentId == 1) 
                    Thread.Sleep(2000);
                else 
                    Thread.Sleep(3000);                       
                }

            }
        });
        // start the new task
        tasks[i].Start();


    }
    // wait for the tasks to complete
    Task.WaitAll(tasks);
    // report on the number of items processed
    Console.WriteLine("Items processed: {0}", itemCount);
    // wait for input before exiting
    Console.WriteLine("Press enter to finish");
    Console.ReadLine();
}
4

2 回答 2

5

替换以下行:

Console.WriteLine("Item " + itemCount + "processed by " ...);

有了这条线:

Console.WriteLine("Item " + queueElement + "processed by " ...);

您看到的问题可能是由于任务Console.WriteLine几乎同时执行,并且两者都看到相同的值,itemCount因为它们以Interlocked.Increment尚未发生调用的方式交错。无论如何打印出来可能更有意义,queueElement因为它更有意义。

于 2013-09-06T00:44:43.377 回答
4

请参阅Brian Gideon关于您的itemCount问题的出色回答。

您可能会考虑重写代码以使用BlockingCollection而不是ConcurrentQueue<T>. 使用起来要容易得多。BlockingCollection是并发集合的包装器。在其默认配置中,后备存储是ConcurrentQueue. 因此,您可以获得相同的并发队列功能,但界面要好得多。

BlockingCollection<int> sharedQueue = new BlockingCollection<int>();

for (int i = 0; i < 10; i++)
{
    sharedQueue.Add(i);
}

// CompleteAdding marks the queue as "complete for adding,"
// meaning that no more items will be added.
sharedQueue.CompleteAdding();

int itemCount= 0;

Task[] tasks = new Task[2];
for (int i = 0; i < tasks.Length; i++)
{
    // create the new task
    tasks[i] = new Task(() =>
    {
        foreach (var queueElement in sharedQueue.GetConsumingEnumerable())
        {
            DateTime dt = DateTime.Now;
            Console.WriteLine("Item " + itemCount + "processed by " 
                + Task.CurrentId + " Time: " + dt.Hour + ":" + dt.Minute + ":" + dt.Second);
            Interlocked.Increment(ref itemCount);   
            if (Task.CurrentId == 1) 
                Thread.Sleep(2000);
            else 
                Thread.Sleep(3000);                       
        }
    });

    // start the new task
    tasks[i].Start();
}

GetConsumingEnumerable返回一个枚举器,它将从队列中获取下一项,直到队列为空。它还可以很好地处理取消,这对于ConcurrentQueue.

一般来说,任何时候你想到使用ConcurrentQueue<T>,你都可能想要BlockingCollection<T>.

于 2013-09-06T19:26:01.967 回答