我有以下情况
- 我正在编写一个处理文件(作业)的服务器
- 一个文件有一个“前缀”和一个时间
- 文件要按时间处理(老文件优先)但也要考虑前缀(相同前缀的文件不能同时处理)
- 我有一个线程(Task with Timer),它监视一个目录并将文件添加到“队列”(生产者)
- 我有几个从“队列”(消费者)获取文件的消费者 - 他们应该符合上述规则。
- 每个任务的工作都保存在某个列表中(这表明了约束)
- 有多个消费者,消费者的数量在启动时确定。
要求之一是能够优雅地停止消费者(立即或让正在进行的进程完成)。
我沿着这条线做了一些事情:
while (processing)
{
//limits number of concurrent tasks
_processingSemaphore.Wait(queueCancellationToken);
//Take next job when available or wait for cancel signal
currentwork = workQueue.Take(taskCancellationToken);
//check that it can actually process this work
if (CanProcess(currnetWork)
{
var task = CreateTask(currentwork)
task.ContinueWith((t) => { //release processing slot });
}
else
//release slot, return job? something else?
}
取消令牌源位于调用方代码中,可以取消。有两个是为了能够在不取消正在运行的任务的同时停止排队。
我厌倦了将“队列”实现为包装“安全”SortedSet 的 BlockingCollection。除了我需要找到与约束匹配的新工作的情况外,一般的想法工作(按时间排序)。如果我将工作返回队列并尝试再次接受,我将得到相同的。
可以从队列中取出作业,直到找到合适的作业,然后返回“非法”作业,但这可能会导致其他消费者处理乱序作业时出现问题
另一种选择是传递一个简单的集合和一种锁定它的方法,然后根据当前的约束锁定并进行简单的搜索。同样,这意味着编写可能不是线程安全的代码。
还有其他可以提供帮助的建议/指针/数据结构吗?