我刚刚开始使用任务并行库。所讨论的任务是以尽可能并行的方式处理结果,但要保持结果的顺序。
此外,可以随时添加一个项目,直到设置标志表示不再接受更多项目。
此外,一旦完成所有结果,将需要通知一些客户(只有在没有更多项目被接受时才会发生这种情况)。
我想出了下面的简化示例,它似乎在我的所有测试中都运行良好。
class Program
{
static void Main(string[] args)
{
for (int i = 1; i < random.Next(5, 21); ++i)
{
AddItem(i);
}
finishedAddingItems = true;
completion.Task.Wait();
Console.WriteLine("Finished");
Console.ReadKey();
}
static TaskCompletionSource<bool> completion =
new TaskCompletionSource<bool>();
static bool finishedAddingItems = false;
static Random random = new Random();
class QueueResult
{
public int data;
public int IsFinished;
}
static ConcurrentQueue<QueueResult> queue =
new ConcurrentQueue<QueueResult>();
static object orderingLockObject = new object();
static void AddItem(int i)
{
var queueItem = new QueueResult { data = i, IsFinished = 0 };
queue.Enqueue(queueItem);
Task.Factory
.StartNew(() =>
{
for (int busy = 0;
busy <= random.Next(9000000, 90000000);
++busy)
{ };
Interlocked.Increment(ref queueItem.IsFinished);
})
.ContinueWith(t =>
{
QueueResult result;
//the if check outside the lock is to avoid tying up resources
//needlessly, since only one continuation can actually process
//the queue at a time.
if (queue.TryPeek(out result)
&& result.IsFinished == 1)
{
lock (orderingLockObject)
{
while (queue.TryPeek(out result)
&& result.IsFinished == 1)
{
Console.WriteLine(result.data);
queue.TryDequeue(out result);
}
if (finishedAddingItems && queue.Count == 0)
{
completion.SetResult(true);
}
}
}
});
}
}
但是,我无法说服自己是否存在潜在的竞争条件,即项目可能无法得到处理?