2

我有以下情况

  • 我正在编写一个处理文件(作业)的服务器
    • 一个文件有一个“前缀”和一个时间
    • 文件要按时间处理(老文件优先)但也要考虑前缀(相同前缀的文件不能同时处理)
  • 我有一个线程(Task with Timer),它监视一个目录并将文件添加到“队列”(生产者)
  • 我有几个从“队列”(消费者)获取文件的消费者 - 他们应该符合上述规则。
    • 每个任务的工作都保存在某个列表中(这表明了约束)
  • 有多个消费者,消费者的数量在启动时确定。

要求之一是能够优雅地停止消费者(立即或让正在进行的进程完成)。

我沿着这条线做了一些事情:

while (processing)
{
    //limits number of concurrent tasks
    _processingSemaphore.Wait(queueCancellationToken);  
    //Take next job when available or wait for cancel signal
    currentwork = workQueue.Take(taskCancellationToken);

    //check that it can actually process this work
    if (CanProcess(currnetWork)
    { 
        var task = CreateTask(currentwork)
        task.ContinueWith((t) => { //release processing slot });
    }
    else
       //release slot, return job? something else?
 }

取消令牌源位于调用方代码中,可以取消。有两个是为了能够在不取消正在运行的任务的同时停止排队。

我厌倦了将“队列”实现为包装“安全”SortedSet 的 BlockingCollection。除了我需要找到与约束匹配的新工作的情况外,一般的想法工作(按时间排序)。如果我将工作返回队列并尝试再次接受,我将得到相同的。

可以从队列中取出作业,直到找到合适的作业,然后返回“非法”作业,但这可能会导致其他消费者处理乱序作业时出现问题

另一种选择是传递一个简单的集合和一种锁定它的方法,然后根据当前的约束锁定并进行简单的搜索。同样,这意味着编写可能不是线程安全的代码。

还有其他可以提供帮助的建议/指针/数据结构吗?

4

2 回答 2

0

我会用TPL Dataflow实现您的要求。看看你可以用它实现生产者-消费者模式的方式。我相信这将满足您的所有要求(包括取消消费者)。

编辑(对于那些不喜欢阅读文档,但谁...)

这是一个如何使用TPL Dataflow实现需求的示例。这种实现的美妙之处在于消费者没有绑定到单个线程,并且仅在需要处理数据时才使用池线程。

    static void Main(string[] args)
    {
        BufferBlock<string> source = new BufferBlock<string>();
        var cancellation = new CancellationTokenSource();
        LinkConsumer(source, "A", cancellation.Token);
        LinkConsumer(source, "B", cancellation.Token);
        LinkConsumer(source, "C", cancellation.Token);

        // Link an action that will process source values that are not processed by other 
        source.LinkTo(new ActionBlock<string>((s) => Console.WriteLine("Default action")));

        while (cancellation.IsCancellationRequested == false)
        {
            ConsoleKey key = Console.ReadKey(true).Key;
            switch (key)
            {
                case ConsoleKey.Escape:
                    cancellation.Cancel();
                    break;
                default:
                    Console.WriteLine("Posted value {0} on thread {1}.", key, Thread.CurrentThread.ManagedThreadId);
                    source.Post(key.ToString());
                    break;
            }
        }

        source.Complete();
        Console.WriteLine("Done.");
        Console.ReadLine();
    }

    private static void LinkConsumer(ISourceBlock<string> source, string prefix, CancellationToken token)
    {
        // Link a consumer that will buffer and process all input of the specified prefix
        var consumer = new ActionBlock<string>(new Action<string>(Process), new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 1, SingleProducerConstrained = true, CancellationToken = token, TaskScheduler = TaskScheduler.Default });
        var linkDisposable = source.LinkTo(consumer, (p) => p == prefix);

        // Dispose the link (remove the link) when cancellation is requested.
        token.Register(linkDisposable.Dispose);
    }

    private static void Process(string arg)
    {
        Console.WriteLine("Processed value {0} in thread {1}", arg, Thread.CurrentThread.ManagedThreadId);

        // Simulate work
        Thread.Sleep(500);
    }
于 2012-12-14T11:12:11.880 回答
0

我认为 Hans 是对的:如果您已经有一个线程安全的 SortedSet(实现了IProducerConsumerCollection,因此可以在 中使用BlockingCollection),那么您只需将现在可以处理的文件放入集合中。如果您完成了使另一个文件可用于处理的文件,请在此时而不是更早地将另一个文件添加到集合中。

于 2012-12-14T21:04:08.410 回答