之前是否有任何将任务添加到具有不同优先级的 TPL 运行时的工作?
如果没有,一般来说,我将如何实施?
理想情况下,我计划使用生产者-消费者模式将“待办事项”工作添加到 TPL。有时我发现需要将低优先级作业升级为高优先级作业(相对于其他作业)。
如果有人有一些我在搜索时应该使用的搜索关键字,请提及它们,因为我还没有找到可以满足我需要的代码。
之前是否有任何将任务添加到具有不同优先级的 TPL 运行时的工作?
如果没有,一般来说,我将如何实施?
理想情况下,我计划使用生产者-消费者模式将“待办事项”工作添加到 TPL。有时我发现需要将低优先级作业升级为高优先级作业(相对于其他作业)。
如果有人有一些我在搜索时应该使用的搜索关键字,请提及它们,因为我还没有找到可以满足我需要的代码。
所以这里是一个相当幼稚的优先级队列周围的相当幼稚的并发实现。这里的想法是,有一个排序集包含真实项目和优先级的对,但给出了一个只比较优先级的比较器。构造函数接受一个计算给定对象优先级的函数。
至于实际实施,它们没有有效实施,我只是lock
围绕着一切。创建更高效的实现将阻止将其SortedSet
用作优先级队列,并且重新实现可以同时有效访问的那些实现并不那么容易。
为了更改项目的优先级,您需要从集合中删除该项目,然后再次添加它,并且要在不迭代整个集合的情况下找到它,您需要知道旧的优先级以及新的优先级.
public class ConcurrentPriorityQueue<T> : IProducerConsumerCollection<T>
{
private object key = new object();
private SortedSet<Tuple<T, int>> set;
private Func<T, int> prioritySelector;
public ConcurrentPriorityQueue(Func<T, int> prioritySelector, IComparer<T> comparer = null)
{
this.prioritySelector = prioritySelector;
set = new SortedSet<Tuple<T, int>>(
new MyComparer<T>(comparer ?? Comparer<T>.Default));
}
private class MyComparer<T> : IComparer<Tuple<T, int>>
{
private IComparer<T> comparer;
public MyComparer(IComparer<T> comparer)
{
this.comparer = comparer;
}
public int Compare(Tuple<T, int> first, Tuple<T, int> second)
{
var returnValue = first.Item2.CompareTo(second.Item2);
if (returnValue == 0)
returnValue = comparer.Compare(first.Item1, second.Item1);
return returnValue;
}
}
public bool TryAdd(T item)
{
lock (key)
{
return set.Add(Tuple.Create(item, prioritySelector(item)));
}
}
public bool TryTake(out T item)
{
lock (key)
{
if (set.Count > 0)
{
var first = set.First();
item = first.Item1;
return set.Remove(first);
}
else
{
item = default(T);
return false;
}
}
}
public bool ChangePriority(T item, int oldPriority, int newPriority)
{
lock (key)
{
if (set.Remove(Tuple.Create(item, oldPriority)))
{
return set.Add(Tuple.Create(item, newPriority));
}
else
return false;
}
}
public bool ChangePriority(T item)
{
lock (key)
{
var result = set.FirstOrDefault(pair => object.Equals(pair.Item1, item));
if (object.Equals(result.Item1, item))
{
return ChangePriority(item, result.Item2, prioritySelector(item));
}
else
{
return false;
}
}
}
public void CopyTo(T[] array, int index)
{
lock (key)
{
foreach (var item in set.Select(pair => pair.Item1))
{
array[index++] = item;
}
}
}
public T[] ToArray()
{
lock (key)
{
return set.Select(pair => pair.Item1).ToArray();
}
}
public IEnumerator<T> GetEnumerator()
{
return ToArray().AsEnumerable().GetEnumerator();
}
IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
}
public void CopyTo(Array array, int index)
{
lock (key)
{
foreach (var item in set.Select(pair => pair.Item1))
{
array.SetValue(item, index++);
}
}
}
public int Count
{
get { lock (key) { return set.Count; } }
}
public bool IsSynchronized
{
get { return true; }
}
public object SyncRoot
{
get { return key; }
}
}
一旦你有了一个IProducerConsumerCollection<T>
实例,也就是上面的对象,你可以将它用作 a 的内部支持对象,BlockingCollection<T>
以便有一个更易于使用的用户界面。
ParallelExtensionsExtras 包含几个自定义TaskScheduler
s,它们可以直接提供帮助,也可以作为您自己的调度程序的基础。
具体来说,有两个调度程序可能会让您感兴趣:
QueuedTaskScheduler
,它允许您以Task
不同的优先级调度 s,但不允许更改已入队Task
的 s 的优先级。ReprioritizableTaskScheduler
,它没有不同的优先级,但允许您将特定的移动Task
到队列的前面或后面。(虽然在当前等待的 s 数量中更改优先级是 O(n),但Task
如果您同时有许多 s,这可能是个问题Task
。)我已经实现了优先级 FIFO 任务队列。
它非常简单,就像具有优先级的 ActionBlock 一样工作。