4

我有一个 ConcurrentQueue ,其中包含我需要获取其来源的 URL 列表。当使用 Parallel.ForEach 和 ConcurrentQueue 对象作为输入参数时,Pop 方法将不起作用(应该返回一个字符串)。

我正在使用 Parallel 并将 MaxDegreeOfParallelism 设置为 4。我真的需要阻止并发线程的数量。使用具有并行性的队列是多余的吗?

提前致谢。

// On the main class
var items = await engine.FetchPageWithNumberItems(result);
// Enqueue List of items
itemQueue.EnqueueList(items);
var crawl = Task.Run(() => { engine.CrawlItems(itemQueue); });

// On the Engine class
public void CrawlItems(ItemQueue itemQueue)
{
Parallel.ForEach(
            itemQueue,
            new ParallelOptions {MaxDegreeOfParallelism = 4},
            item =>
            {

                var worker = new Worker();
                // Pop doesn't return anything
                worker.Url = itemQueue.Pop();
                /* Some work */
             });
 }

// Item Queue
class ItemQueue : ConcurrentQueue<string>
    {
        private ConcurrentQueue<string> queue = new ConcurrentQueue<string>();

        public string Pop()
        {
            string value = String.Empty;
            if(this.queue.Count == 0)
                throw new Exception();
            this.queue.TryDequeue(out value);
            return value;
        }

        public void Push(string item)
        {
            this.queue.Enqueue(item);
        }

        public void EnqueueList(List<string> list)
        {
            list.ForEach(this.queue.Enqueue);
        }
    }
4

2 回答 2

6

You don't need ConcurrentQueue<T> if all you're going to do is to first add items to it from a single thread and then iterate it in Parallel.ForEach(). A normal List<T> would be enough for that.

Also, your implementation of ItemQueue is very suspicious:

  • It inherits from ConcurrentQueue<string> and also contains another ConcurrentQueue<string>. That doesn't make much sense, is confusing and inefficient.

  • The methods on ConcurrentQueue<T> were designed very carefully to be thread-safe. Your Pop() isn't thread-safe. What could happen is that you check Count, notice it's 1, then call TryDequeue() and not get any value (i.e. value will be null), because another thread removed the item from the queue in the time between the two calls.

于 2016-07-10T00:56:04.587 回答
0

问题在于 CrawlItems 方法,因为您不应该在提供给 ForEach 方法的操作中调用 Pop。原因是对每个弹出的项目都调用了该操作,因此该项目已经被弹出。这就是该操作具有“项目”参数的原因。

我假设你得到 null 因为其他线程已经通过 ForEach 方法弹出了所有项目。

因此,您的代码应如下所示:

public void CrawlItems(ItemQueue itemQueue)
{
    Parallel.ForEach(
        itemQueue,
        new ParallelOptions {MaxDegreeOfParallelism = 4},
        item =>
        {
            worker.Url = item;
            /* Some work */
         });
}
于 2016-06-11T21:23:56.147 回答