2

我是多线程概念的新手。我需要将一定数量的字符串添加到队列中并使用多个线程处理它们。使用ConcurrentQueue哪个是线程安全的。

这是我尝试过的。但是所有添加到并发队列中的项目都不会被处理。仅处理前 4 个项目。

class Program
{
    ConcurrentQueue<string> iQ = new ConcurrentQueue<string>();
    static void Main(string[] args)
    {
        new Program().run();
    }

    void run()
    {
        int threadCount = 4;
        Task[] workers = new Task[threadCount];

        for (int i = 0; i < threadCount; ++i)
        {
            int workerId = i;
            Task task = new Task(() => worker(workerId));
            workers[i] = task;
            task.Start();
        }

        for (int i = 0; i < 100; i++)
        {
            iQ.Enqueue("Item" + i);
        }

        Task.WaitAll(workers);
        Console.WriteLine("Done.");

        Console.ReadLine();
    }

    void worker(int workerId)
    {
        Console.WriteLine("Worker {0} is starting.", workerId);
        string op;
        if(iQ.TryDequeue(out op))
        {
            Console.WriteLine("Worker {0} is processing item {1}", workerId, op);
        }

        Console.WriteLine("Worker {0} is stopping.", workerId);
    }


}
4

3 回答 3

7

您的实施存在几个问题。第一个也是很明显的一个是该worker方法仅将零个或一个项目出列然后停止:

    if(iQ.TryDequeue(out op))
    {
        Console.WriteLine("Worker {0} is processing item {1}", workerId, op);
    }

它应该是:

    while(iQ.TryDequeue(out op))
    {
        Console.WriteLine("Worker {0} is processing item {1}", workerId, op);
    }

然而,这不足以使您的程序正常工作。如果您的工作线程出队速度快于主线程入队速度,则它们将在主任务仍在入队时停止。您需要向工人发出信号,他们可以停止。true您可以定义一个布尔变量,一旦入队完成,该变量将被设置为:

for (int i = 0; i < 100; i++)
{
    iQ.Enqueue("Item" + i);
}
Volatile.Write(ref doneEnqueueing, true);

工作人员将检查该值:

void worker(int workerId)
{
    Console.WriteLine("Worker {0} is starting.", workerId);
    do {
        string op;
        while(iQ.TryDequeue(out op))
        {
            Console.WriteLine("Worker {0} is processing item {1}", workerId, op);
        }
        SpinWait.SpinUntil(() => Volatile.Read(ref doneEnqueueing) || (iQ.Count > 0));
    }
    while (!Volatile.Read(ref doneEnqueueing) || (iQ.Count > 0))
    Console.WriteLine("Worker {0} is stopping.", workerId);
}  
于 2017-01-04T09:09:34.663 回答
3

您的工人从其中取出一件物品queue然后完成工作,让他们工作直到queue空无一人。

if在工作函数中替换为while

void worker(int workerId)
{
    Console.WriteLine("Worker {0} is starting.", workerId);
    string op;
    while (iQ.TryDequeue(out op))
    {
        Console.WriteLine("Worker {0} is processing item {1}", workerId, op);
    }

    Console.WriteLine("Worker {0} is stopping.", workerId);
}

当您运行它时,您会看到,几乎所有项目都将由两名工人处理。原因:您的 cpu 有两个内核,两个内核都在工作,并且没有“空闲 tiem slot”来创建新任务。如果你想让你的所有 4 个任务来处理项目,你可以添加一个延迟来给你的处理器时间来创建 anotehr 任务,比如:

while (iQ.TryDequeue(out op))
{
    Console.WriteLine("Worker {0} is processing item {1}", workerId, op);
    Task.Delay(TimeSpan.FromMilliseconds(1)).Wait();
}

为您提供所需的输出:

...
Worker 0 is processing item Item8
Worker 1 is processing item Item9
Worker 2 is processing item Item10
Worker 3 is processing item Item11
Worker 3 is processing item Item13
Worker 1 is processing item Item12
...
于 2017-01-04T09:02:48.557 回答
0

我实际上ConcurrentQueue最近一直在工作,并认为我会分享这个。我创建了一个自定义ConcurrentQueue调用CQItems,它具有使用给定参数构建自身的方法。在内部,当您告诉它构建xy项目时,它会Parallel.For调用项目构造函数。这里的好处是,当一个方法或函数调用CQItems myCQ = CQItems.Generate(x, y)来自基本应用程序线程时,这意味着在完成构建之前没有任何东西可以查看队列。但是在队列类内部,它是用线程构建的,并且比仅使用List<>or快得多Queue<>. 大多数情况下,它是凭空生成的,但有时(基于参数)从 SQL 创建项目 - 基本上是根据现有数据生成对象。无论如何,这些是CQItems类中可以帮助解决此问题的两种方法:

public void Generate(int numberOfItems = 1, ItemOptions options = ItemOptions.NONE)
    {
        try
        {
            Type t = typeof(T);

            if (t == typeof(Item))
                throw new ArgumentException("Type " + t + " is not valid for generation.  Please contact QA.");

            else
                Parallel.For(0, numberOfItems, (i, loopState) =>
                {
                    try
                    {
                        GenerateItemByType(typeof(T), options);
                    }

                    catch
                    {
                        loopState.Stop();
                        throw;
                    }

                });
        }

        catch (AggregateException ae)
        {
            ae.Handle((x) =>
            {
                if (x is SQLNullResultsException)
                {
                    throw x;
                }
                else if (x is ImageNotTIFFException)
                {
                    throw x;
                }
                else
                {
                    throw x;
                }

                return true;
            });
        }

        catch
        {
            throw;
        }

        finally
        {
            ItemManager.Instance.Clear();
        }
    }

    private void GenerateItemByType(Type t, ItemOptions options = ItemOptions.NONE)
    {
        try
        {
            if (t == typeof(ItemA))
            {
                if ((options & ItemOptions.DUPLICATE) != 0)
                {
                    this.Enqueue(new ItemB(options));
                }
                else
                {
                    this.Enqueue(new ItemA(options));
                }
            }
            else if (t == typeof(ItemC))
            {
                this.Enqueue(new ItemC(options));
            }
        }

        catch
        {
            throw;
        }

        finally { }
    }

一些有用的注释:

在 中提供loopState变量Parallel.For()允许我们将状态设置为在捕获到异常时停止。这很好,因为如果你的循环被要求做 1000 件事情,而第 5 次迭代抛出异常,它将继续循环。您可能想要它,但在我的情况下,异常需要退出线程循环。你仍然会AggregateException从它中走出来(显然,这就是线程抛出异常时发生的情况)。将它们解析出来并只发送第一个可以节省大量时间和头疼的尝试清除一个巨大的异常组,其中以后的异常可能(或可能不会)是由于第一个异常引起的。

至于重新抛出,我尝试为大多数预期类型的​​异常添加一个 catch 语句,即使我打算将它们扔到堆栈中。其中一部分是用于故障排除(能够打破特定异常可能很方便)。部分原因是有时我希望能够做其他事情,例如停止循环、更改或添加异常消息,或者在拆分的情况下AggregateException,只将一个异常发送回堆栈而不是整体聚合。对于任何可能正在查看此内容的人来说,这只是一个澄清点。

最后,如果它令人困惑,Type(T)价值来自我的CQItems班级本身:

     public class CQItems<T> : ConcurrentQueue<Item>, IDisposable
于 2018-11-15T13:43:47.020 回答