2

我目前正在做一个项目,我需要排队处理一些工作,这是要求:

  1. 作业必须一次处理一个
  2. 排队的项目必须能够等待

所以我想要类似于:

Task<result> QueueJob(params here)
{
   /// Queue the job and somehow return a waitable task that will wait until the queued job has been executed and return the result.
}

我试过有一个后台运行任务,它只是从队列中拉出项目并处理作业,但困难在于从后台任务到方法。

如果需要,我可以只在 QueueJob 方法中请求完成回调,但如果我能得到一个透明的 Task 回来,让您等待作业被处理(即使有作业),那就太好了在队列之前)。

4

3 回答 3

6

你可能会发现TaskCompletionSource<T>它很有用,它可以用来创建一个Task在你想要的时候完全完成的。如果将它与 结合使用BlockingCollection<T>,您将获得队列:

class JobProcessor<TInput, TOutput> : IDisposable
{
    private readonly Func<TInput, TOutput> m_transform;

    // or a custom type instead of Tuple
    private readonly
        BlockingCollection<Tuple<TInput, TaskCompletionSource<TOutput>>>
        m_queue =
        new BlockingCollection<Tuple<TInput, TaskCompletionSource<TOutput>>>();

    public JobProcessor(Func<TInput, TOutput> transform)
    {
        m_transform = transform;
        Task.Factory.StartNew(ProcessQueue, TaskCreationOptions.LongRunning);
    }

    private void ProcessQueue()
    {
        Tuple<TInput, TaskCompletionSource<TOutput>> tuple;
        while (m_queue.TryTake(out tuple, Timeout.Infinite))
        {
            var input = tuple.Item1;
            var tcs = tuple.Item2;

            try
            {
                tcs.SetResult(m_transform(input));
            }
            catch (Exception ex)
            {
                tcs.SetException(ex);
            }
        }
    }

    public Task<TOutput> QueueJob(TInput input)
    {
        var tcs = new TaskCompletionSource<TOutput>();
        m_queue.Add(Tuple.Create(input, tcs));
        return tcs.Task;
    }

    public void Dispose()
    {
        m_queue.CompleteAdding();
    }
}
于 2013-02-18T09:40:32.517 回答
2

我会去做这样的事情:

class TaskProcessor<TResult>
{
    // TODO: Error handling!

    readonly BlockingCollection<Task<TResult>> blockingCollection = new BlockingCollection<Task<TResult>>(new ConcurrentQueue<Task<TResult>>());

    public Task<TResult> AddTask(Func<TResult> work)
    {
        var task = new Task<TResult>(work);
        blockingCollection.Add(task);
        return task; // give the task back to the caller so they can wait on it
    }

    public void CompleteAddingTasks()
    {
        blockingCollection.CompleteAdding();
    }

    public TaskProcessor()
    {
        ProcessQueue();
    }

    void ProcessQueue()
    {
        Task<TResult> task;
        while (blockingCollection.TryTake(out task))
        {
            task.Start();
            task.Wait(); // ensure this task finishes before we start a new one...
        }
    }
}

根据使用它的应用程序的类型,您可以将 BlockingCollection/ConcurrentQueue 切换为更简单的东西(例如,只是一个普通队列)。您还可以根据要排队的方法/参数来调整“AddTask”方法的签名......

于 2013-02-18T05:48:34.807 回答
1

Func<T>不接受任何参数并返回 T 类型的值。作业一个接一个地运行,您可以等待返回的任务以获取结果。

public class TaskQueue
{
    private Queue<Task> InnerTaskQueue;

    private bool IsJobRunning;

    public void Start()
    {
        Task.Factory.StartNew(() =>
        {
            while (true)
            {
                if (InnerTaskQueue.Count > 0 && !IsJobRunning)
                {
                     var task = InnerTaskQueue.Dequeue()
                     task.Start();
                     IsJobRunning = true;
                     task.ContinueWith(t => IsJobRunning = false);
                }
                else
                {
                     Thread.Sleep(1000);
                }
            }
        }
    }

    public Task<T> QueueJob(Func<T> job)
    {
        var task = new Task<T>(() => job());
        InnerTaskQueue.Enqueue(task);
        return task;
    }
}
于 2013-02-18T05:08:56.170 回答