1

我有这种情况:

var tasks = new List<ITask> ...
Parallel.ForEach(tasks, currentTask => currentTask.Execute() );

是否可以指示 PLinq 在生成下一个线程之前等待 500 毫秒?

System.Threading.Thread.Sleep(5000);
4

4 回答 4

7

Parallel.Foreach完全错误地使用了,您应该制作一个特殊的枚举器,将其速率限制为每 500 毫秒获取一次数据。

由于您没有提供任何细节,我对您的DTO的工作方式做了一些假设。

private IEnumerator<SomeResource> GetRateLimitedResource()
{
    SomeResource someResource = null;
    do
    {
        someResource = _remoteProvider.GetData();

        if(someResource != null)
        {
             yield return someResource;
             Thread.Sleep(500);
        }
    } while (someResource != null);
}

这就是你的并行应该是什么样子

Parallel.ForEach(GetRateLimitedResource(), SomeFunctionToProcessSomeResource);
于 2013-07-15T14:00:54.903 回答
3

已经有一些好的建议了。我同意其他人的观点,即您以不打算使用的方式使用 PLINQ。

我的建议是使用System.Threading.Timer。这可能比编写一个返回一个IEnumerable<>强制半秒延迟的方法更好,因为您可能不需要等待整整半秒,具体取决于自上次 API 调用以来已经过去了多少时间。

使用计时器,它将在您指定的时间间隔内调用您提供的委托,因此即使第一个任务未完成,半秒后它也会在另一个线程上调用您的委托,因此不会是任何额外的等待。

从您的示例代码中,听起来您有一个任务列表,在这种情况下,我将使用System.Collections.Concurrent.ConcurrentQueue来跟踪任务。一旦队列为空,关闭计时器。

于 2013-07-15T14:07:12.240 回答
2

You could use Enumerable.Aggregate instead.

var task = tasks.Aggregate((t1, t2) =>
                                t1.ContinueWith(async _ =>
                                    { Thread.Sleep(500); return t2.Result; }));

If you don't want the tasks chained then there is also the overload to Select assuming the tasks are in order of delay.

var tasks = Enumerable
              .Range(1, 10)
              .Select(x => Task.Run(() => x * 2))
              .Select((x, i) => Task.Delay(TimeSpan.FromMilliseconds(i * 500))
                                    .ContinueWith(_ => x.Result));

foreach(var result in tasks.Select(x => x.Result))
{
    Console.WriteLine(result);
}

From the comments a better options would be to guard the resource instead of using the time delay.

static object Locker = new object();

static int GetResultFromResource(int arg)
{
    lock(Locker)
    {
        Thread.Sleep(500);
        return arg * 2;
    }
}

var tasks = Enumerable
          .Range(1, 10)
          .Select(x => Task.Run(() => GetResultFromResource(x)));

foreach(var result in tasks.Select(x => x.Result))
{
    Console.WriteLine(result);
}
于 2013-07-15T13:34:48.570 回答
1

在这种情况下,带有 的 Producer-Consumer 模式怎么样BlockingCollection<T>

var tasks = new BlockingCollection<ITask>();

// add tasks, if this is an expensive process, put it out onto a Task
// tasks.Add(x);

// we're done producin' (allows GetConsumingEnumerable to finish)
tasks.CompleteAdding();

RunTasks(tasks);

使用单个消费者线程:

static void RunTasks(BlockingCollection<ITask> tasks)
{
    foreach (var task in tasks.GetConsumingEnumerable())
    {
        task.Execute();

        // this may not be as accurate as you would like
        Thread.Sleep(500);
    }
}

如果您有权访问 .Net 4.5,您可以使用Task.Delay

static void RunTasks(BlockingCollection<ITask> tasks)
{
    foreach (var task in tasks.GetConsumingEnumerable())
    {
        Task.Delay(500)
            .ContinueWith(() => task.Execute())
            .Wait();
    }
}
于 2013-07-15T14:01:41.493 回答