0

也许这个功能已经隐藏在 .NET-Framework 的某个地方,但我找不到它。我需要按给定的顺序一个接一个地执行方法。这些方法应该返回一些东西(例如对象),因此有一种方法可以对返回的数据做出反应(例如,由于发生错误而取消以下方法的执行)。方法的执行应该在它自己的线程中运行,我应该能够随时将方法添加到队列中。知道如何在 c# 中实现它吗?

感谢 Jon 的评论,我尝试自己实现这样的队列。这可能是完全错误的 - 所以非常欢迎评论;-)

using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;

namespace FunctionQueue
{
    public class ErrorResult
    {
        public string ErrorMessage { get; set; }
        public bool CancelQueue { get; set; }
    }

    public class FunctionQueue
    {
        private readonly ConcurrentQueue<Func<object>> Queue;
        private bool isExecuting;
        private bool isCanceled;
        private readonly Action<ErrorResult> onError;
        private readonly Action onComplete;

        public FunctionQueue(Action<ErrorResult> onError = null, Action onComplete = null)
        {
            this.Queue = new ConcurrentQueue<Func<object>>();
            this.onError = onError;
            this.onComplete = onComplete;
        }

        public void AddFunctionToQueue( Func<object> functionToAdd, bool startQueue = false )
        {
            this.Queue.Enqueue(functionToAdd);
            if (startQueue && !this.isExecuting) this.ProcessQueue();
        }

        public void ProcessQueue()
        {
            if( this.Queue.Count > 0 )
            {
                Task.Run( () =>
                              {
                                  this.isCanceled = false;
                                  this.isExecuting = true;
                                  Func<object> functionToExecuteNext;
                                  while( !isCanceled && this.Queue.TryDequeue( out functionToExecuteNext ) )
                                  {
                                      object result = functionToExecuteNext();
                                      ErrorResult errorResult = result as ErrorResult;
                                      if( errorResult != null )
                                      {
                                          if( this.onError != null ) this.onError( errorResult );
                                          if( errorResult.CancelQueue ) this.isCanceled = true;
                                      }
                                  }
                              } );
            }
            this.isExecuting = false;
            if( this.onComplete != null ) this.onComplete();
        }
    }
}

我想添加另一个功能,但不幸的是我不知道如何实现:我想为每个添加的函数添加一个可选回调。当此给定函数完成时,应调用该回调。如何添加此功能?

4

2 回答 2

4

听起来您可以只使用Func<object>.

假设您使用的是 .NET 4,您应该使用BlockingCollection<T>适当的包装器IProducerConsumerCollection<T>(例如ConcurrentQueue<T>)。这些类型旨在帮助简化生产者/消费者的情况。

您还应该查看Dataflow,它在此之上提供了一些更高级别的构造,例如,如果您需要构建管道。

于 2013-05-13T08:13:26.563 回答
0

你不应该使用ConcurrentQueue<T>. 使用BlockingCollection<T>,它提供了一个更好的包装器,您可以使用它。

然后,您可以使用以下方法处理队列:

public void ProcessQueue()
{
    Func<object> functionToExecuteNext;
    while (!IsCancelled && queue.TryTake(out functionToExecuteNext, Timeout.Infinite))
    {
        // execute the function
    }
}

在你的程序初始化中:

queue = new BlockingCollection<Func<object>>();
var t = new Task(ProcessQueue, TaskCreationOptions.LongRunning);
t.Start();

// program does stuff
// time to shut down
queue.CompleteAdding();  // mark the queue as complete for adding

// at this point you'll want to wait until the queue is empty.
// unless you want to quit immediately. Then just set the IsCancelled flag.

在这里,TryTake在队列上进行非忙等待。它将一直等到一个项目可用,或者直到队列被标记为完成。关键是它在等待时不会消耗 CPU 资源。

当生产者完成向队列添加内容时,它queue.CompleteAdding会调用 ,当队列为空时TryTake退出。false

因此,您可以在程序开始时启动队列处理线程,它会在项目进入时对其进行处理。不需要isExecuting标志,也不需要AddFunctionToQueue检查标志并启动线程的方法。

我使用Task构造函数来创建带有LongRunning选项的任务,以便调度程序可以更好地调度其他工作。

于 2013-05-15T16:49:05.263 回答