3

I have the following Scenario.

  1. I take 50 jobs from the database into a blocking collection.

  2. Each job is a long running one. (potentially could be). So I want to run them in a separate thread. (I know - it may be better to run them as Task.WhenAll and let the TPL figure it out - but I want to control how many runs simultaneously)

  3. Say I want to run 5 of them simultaneously (configurable)

  4. I create 5 tasks (TPL), one for each job and run them in parallel.

What I want to do is to pick up the next Job in the blocking collection as soon as one of the jobs from step 4 is complete and keep going until all 50 are done.

I am thinking of creating a Static blockingCollection and a TaskCompletionSource which will be invoked when a job is complete and then it can call the consumer again to pick one job at a time from the queue. I would also like to call async/await on each job - but that's on top of this - not sure if that has an impact on the approach.

Is this the right way to accomplish what I'm trying to do?

Similar to this link, but catch is that I want to process the next Job as soon as one of the first N items are done. Not after all N are done.

Update :

Ok, I have this code snippet doing exactly what I want, if someone wants to use it later. As you can see below, 5 threads are created and each thread starts the next job when it is done with current. Only 5 threads are active at any given time. I understand this may not work 100% like this always, and will have performance issues of context switching if used with one cpu/core.

var block = new ActionBlock<Job>(
                job => Handler.HandleJob(job), 
                    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5 });

              foreach (Job j in GetJobs())
                  block.SendAsync(j);

Job 2 started on thread :13. wait time:3600000ms. Time:8/29/2014 3:14:43 PM

Job 4 started on thread :14. wait time:15000ms. Time:8/29/2014 3:14:43 PM

Job 0 started on thread :7. wait time:600000ms. Time:8/29/2014 3:14:43 PM

Job 1 started on thread :12. wait time:900000ms. Time:8/29/2014 3:14:43 PM

Job 3 started on thread :11. wait time:120000ms. Time:8/29/2014 3:14:43 PM

job 4 finished on thread :14. 8/29/2014 3:14:58 PM

Job 5 started on thread :14. wait time:1800000ms. Time:8/29/2014 3:14:58 PM

job 3 finished on thread :11. 8/29/2014 3:16:43 PM

Job 6 started on thread :11. wait time:1200000ms. Time:8/29/2014 3:16:43 PM

job 0 finished on thread :7. 8/29/2014 3:24:43 PM

Job 7 started on thread :7. wait time:30000ms. Time:8/29/2014 3:24:43 PM

job 7 finished on thread :7. 8/29/2014 3:25:13 PM

Job 8 started on thread :7. wait time:100000ms. Time:8/29/2014 3:25:13 PM

job 8 finished on thread :7. 8/29/2014 3:26:53 PM

Job 9 started on thread :7. wait time:900000ms. Time:8/29/2014 3:26:53 PM

job 1 finished on thread :12. 8/29/2014 3:29:43 PM

Job 10 started on thread :12. wait time:300000ms. Time:8/29/2014 3:29:43 PM

job 10 finished on thread :12. 8/29/2014 3:34:43 PM

Job 11 started on thread :12. wait time:600000ms. Time:8/29/2014 3:34:43 PM

job 6 finished on thread :11. 8/29/2014 3:36:43 PM

Job 12 started on thread :11. wait time:300000ms. Time:8/29/2014 3:36:43 PM

job 12 finished on thread :11. 8/29/2014 3:41:43 PM

Job 13 started on thread :11. wait time:100000ms. Time:8/29/2014 3:41:43 PM

job 9 finished on thread :7. 8/29/2014 3:41:53 PM

Job 14 started on thread :7. wait time:300000ms. Time:8/29/2014 3:41:53 PM

job 13 finished on thread :11. 8/29/2014 3:43:23 PM

job 11 finished on thread :12. 8/29/2014 3:44:43 PM

job 5 finished on thread :14. 8/29/2014 3:44:58 PM

job 14 finished on thread :7. 8/29/2014 3:46:53 PM

job 2 finished on thread :13. 8/29/2014 4:14:43 PM

4

3 回答 3

5

您可以使用TPL Dataflow.

您可以做的是使用BufferBlock<T>,它是一个用于存储数据的缓冲区,并将其与ActionBlock<T>将在这些请求来自BufferBlock<T>.

ActionBlock<T>现在,这里的美妙之处在于您可以指定您希望使用ExecutionDataflowBlockOptions该类同时处理多少个请求。

这是一个简化的控制台版本,当他们进入时处理一堆数字,打印他们的名字和Thread.ManagedThreadID

private static void Main(string[] args)
{
    var bufferBlock = new BufferBlock<int>();

    var actionBlock =
        new ActionBlock<int>(i => Console.WriteLine("Reading number {0} in thread {1}",
                                  i, Thread.CurrentThread.ManagedThreadId),
                             new ExecutionDataflowBlockOptions 
                                 {MaxDegreeOfParallelism = 5});

    bufferBlock.LinkTo(actionBlock);
    Produce(bufferBlock);

    Console.ReadKey();
}

private static void Produce(BufferBlock<int> bufferBlock)
{
    foreach (var num in Enumerable.Range(0, 500))
    {
        bufferBlock.Post(num);
    }
}

如果需要,您还可以使用 awaitable 异步发布它们BufferBlock.SendAsync

这样一来,您就可以让TPL控制器为您处理所有限制,而无需手动执行。

于 2014-08-29T07:28:49.487 回答
3

您可以使用BlockingCollection它,它会正常工作,但它是之前构建的,async-await因此它会同步阻塞,这在大多数情况下可能不太可扩展。

你最好按照 Yuval Itzchakov 的建议使用asyncready 。TPL Dataflow您所需要的只是一个ActionBlock同时处理每个项目的 5 个项目,然后同步 ( ) 或异步 ( )MaxDegreeOfParallelism将您的工作发布到它:block.Post(item)await block.SendAsync(item)

private static void Main()
{
    var block = new ActionBlock<Job>(
        async job => await job.ProcessAsync(),
        new ExecutionDataflowBlockOptions {MaxDegreeOfParallelism = 5});

    for (var i = 0; i < 50; i++)
    {
        block.Post(new Job());
    }

    Console.ReadKey();
}
于 2014-08-29T10:59:10.120 回答
0

您可以SemaphoreSlim此答案中使用ForEachAsynclike 或在此答案中使用 like 。

于 2014-08-29T06:21:54.420 回答