1

我需要为不同的任务创建一个队列。目前,这是通过http://www.albahari.com/threading/part4.aspx#_Wait_and_Pulse提供的示例的定制版本完成的:

using System;
using System.Threading;
using System.Collections.Generic;

public class PCQueue
{
  readonly object _locker = new object();
  Thread[] _workers;
  Queue<Action> _itemQ = new Queue<Action>();

  public PCQueue (int workerCount)
  {
    _workers = new Thread [workerCount];

    // Create and start a separate thread for each worker
    for (int i = 0; i < workerCount; i++)
      (_workers [i] = new Thread (Consume)).Start();
  }

  public void Shutdown (bool waitForWorkers)
  {
    // Enqueue one null item per worker to make each exit.
    foreach (Thread worker in _workers)
      EnqueueItem (null);

    // Wait for workers to finish
    if (waitForWorkers)
      foreach (Thread worker in _workers)
        worker.Join();
  }

  public void EnqueueItem (Action item)
  {
    lock (_locker)
    {
      _itemQ.Enqueue (item);           // We must pulse because we're
      Monitor.Pulse (_locker);         // changing a blocking condition.
    }
  }

  void Consume()
  {
    while (true)                        // Keep consuming until
    {                                   // told otherwise.
      Action item;
      lock (_locker)
      {
        while (_itemQ.Count == 0) Monitor.Wait (_locker);
        item = _itemQ.Dequeue();
      }
      if (item == null) return;         // This signals our exit.
      item();                           // Execute item.
    }
  }
}

使用主要方法:

static void Main()
{
  PCQueue q = new PCQueue (2);

  Console.WriteLine ("Enqueuing 10 items...");

  for (int i = 0; i < 10; i++)
  {
    int itemNumber = i;      // To avoid the captured variable trap
    q.EnqueueItem (() =>
    {
      Thread.Sleep (1000);          // Simulate time-consuming work
      Console.Write (" Task" + itemNumber);
    });
  }

  q.Shutdown (true);
  Console.WriteLine();
  Console.WriteLine ("Workers complete!");
}

然而,在浏览 stackoverflow 时,我偶然发现了这个修改后的版本:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;

namespace Project
{
    /// <summary>
    /// Description of Multithread.
    /// </summary>
     public class Multithread<T> : IDisposable where T : class
    {
        object locker = new object();
        Thread[] workers;
        Queue<T> taskQ = new Queue<T>();

        public void TaskQueue(int workerCount)
        {
            workers = new Thread[workerCount];

            // Create and start a separate thread for each worker
            for (int i = 0; i < workerCount; i++)
                (workers[i] = new Thread(Consume)).Start();
        }

        public void Dispose()
        {
            // Enqueue one null task per worker to make each exit.
            foreach (Thread worker in workers) EnqueueTask(null);
            foreach (Thread worker in workers) worker.Join();
        }

        public void EnqueueTask(T task)
        {
            lock (locker)
            {
                taskQ.Enqueue(task);
                Monitor.PulseAll(locker);
            }
        }

        void Consume()
        {
            while (true)
            {
                T task;
                lock (locker)
                {
                    while (taskQ.Count == 0) Monitor.Wait(locker);
                    task = taskQ.Dequeue();
                }
                if (task == null) return;         // This signals our exit
                System.Diagnostics.Debug.WriteLine(task);
                Thread.Sleep(1000);              // Simulate time-consuming task
            }
        }
    }
}

这似乎提供了更好的可用性。但是我不知道如何正确地将任务添加到这个队列中。

classname testclass = new classname();
Multithread<classname> testthread = new Multithread<classname>();

我认为这将是类似的东西:

testthread.EnqueueTask(testclass.functioname());

但是,它似乎不起作用。我被困在这个问题上,在其他地方找不到任何解决这个问题的方法。

4

2 回答 2

1

您可以通过使用BlockingCollection. 这个数据结构被实现为一个已经封装了生产者-消费者逻辑的队列。

public class PCQueue
{
  private Thread[] workers;
  private BlockingCollection<Action> queue = new BlockingCollection<Action>();
  private CancellationTokenSource cts = new CancellationTokenSource();

  public PCQueue(int workerCount)
  {
    workers = new Thread[workerCount];
    for (int i = 0; i < workerCount; i++)
    {
      workers[i] = new Thread(Run);
      workers[i].Start();
    }
  }

  public void Shutdown(bool waitForWorkers)
  {
    cts.Cancel();
    if (waitForWorkers)
    {
      foreach (Thread thread in workers)
      {
        thread.Join();
      }
    }
  }

  public void EnqueueItem(Action action)
  {
    queue.Add(action);
  }

  private void Consumer()
  {
    while (true)
    {
      Action action = queue.Take(cts.Token);
      try
      {
        if (action != null) action();
      }
      catch (Exception caught)
      {
        // Notify somebody that something bad happened.
      }
    }
  }
}
于 2013-09-05T16:45:25.127 回答
1

我看不出如何Multithread提供更好的可用性,因为它似乎展示了如何通过没有真正决定如何实际消费项目来以通用方式实现生产者/消费者模式。另一方面PCQueue,与允许它实际消费项目的操作一起工作。

要修改Multithread以允许它做一些工作,您可以删除泛型类型参数T并替换所有出现的Tby Action。在该Consume方法中,您需要替换代码

System.Diagnostics.Debug.WriteLine(task);
Thread.Sleep(1000);              // Simulate time-consuming task

经过

task();

要将任务排入队列,您应该完全按照使用时所做的那样,PCQueue通过提供Action. 您可以为此使用 lambda 表达式。

于 2013-09-05T09:52:52.987 回答