0

我想获取我在阻塞队列中添加的当前正在执行的任务,如何完成?

编辑:我正在使用这个优先级调度程序,并添加多个具有不同优先级的任务:

public class PriorityScheduler : TaskScheduler
{
    public static PriorityScheduler Highest = new PriorityScheduler(ThreadPriority.Highest);
    public static PriorityScheduler AboveNormal = new PriorityScheduler(ThreadPriority.AboveNormal);
    public static PriorityScheduler Normal = new PriorityScheduler(ThreadPriority.Normal);
    public static PriorityScheduler BelowNormal = new PriorityScheduler(ThreadPriority.BelowNormal);
    public static PriorityScheduler Lowest = new PriorityScheduler(ThreadPriority.Lowest);

    public static BlockingCollection<Task> _tasks = new BlockingCollection<Task>();
    private Thread[] _threads;
    private ThreadPriority _priority;
    private readonly int _maximumConcurrencyLevel = Math.Max(1, Environment.ProcessorCount);

    public PriorityScheduler(ThreadPriority priority)
    {
        _priority = priority;
    }

    public override int MaximumConcurrencyLevel
    {
        get { return _maximumConcurrencyLevel; }
    }

    protected override IEnumerable<Task> GetScheduledTasks()
    {
        return _tasks;
    }

    protected override void QueueTask(Task task)
    {
        _tasks.Add(task);

        if (_threads == null)
        {
            _threads = new Thread[_maximumConcurrencyLevel];
            for (int i = 0; i < _threads.Length; i++)
            {
                int local = i;
                _threads[i] = new Thread(() =>
                {
                    foreach (Task t in _tasks.GetConsumingEnumerable())
                        base.TryExecuteTask(t);
                });
                _threads[i].Name = string.Format("PriorityScheduler: ", i);
                _threads[i].Priority = _priority;
                _threads[i].IsBackground = true;
                _threads[i].Start();
            }
        }
    }

    protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
    {
        return false; // we might not want to execute task that should schedule as high or low priority inline
    }
}

我想根据那里的优先级停止和恢复任务,例如,如果具有更高优先级的新任务到达,较低的停止并让任务执行,然后自行恢复......

4

1 回答 1

2

如果您指的是BlockingCollection<T>,则不能(直接)。当您调用Take()(或通过 获取下一个项目GetConsumingEnumerable())时,该项目(任务?)实际上已从底层连接中删除。

如果您希望它可用,您将需要拥有您的消费者商店并公开“当前任务”。

Note that the Parallel Extension Extras project provides a great QueuedTaskScheduler which may accomplish your goals here. It allows you to create prioritized TaskScheduler instances, and handles all of the scheduling for you.

于 2012-10-05T21:11:54.140 回答