我有这种情况:
var tasks = new List<ITask> ...
Parallel.ForEach(tasks, currentTask => currentTask.Execute() );
是否可以指示 PLinq 在生成下一个线程之前等待 500 毫秒?
System.Threading.Thread.Sleep(5000);
您Parallel.Foreach
完全错误地使用了,您应该制作一个特殊的枚举器,将其速率限制为每 500 毫秒获取一次数据。
由于您没有提供任何细节,我对您的DTO的工作方式做了一些假设。
private IEnumerator<SomeResource> GetRateLimitedResource()
{
SomeResource someResource = null;
do
{
someResource = _remoteProvider.GetData();
if(someResource != null)
{
yield return someResource;
Thread.Sleep(500);
}
} while (someResource != null);
}
这就是你的并行应该是什么样子
Parallel.ForEach(GetRateLimitedResource(), SomeFunctionToProcessSomeResource);
已经有一些好的建议了。我同意其他人的观点,即您以不打算使用的方式使用 PLINQ。
我的建议是使用System.Threading.Timer。这可能比编写一个返回一个IEnumerable<>
强制半秒延迟的方法更好,因为您可能不需要等待整整半秒,具体取决于自上次 API 调用以来已经过去了多少时间。
使用计时器,它将在您指定的时间间隔内调用您提供的委托,因此即使第一个任务未完成,半秒后它也会在另一个线程上调用您的委托,因此不会是任何额外的等待。
从您的示例代码中,听起来您有一个任务列表,在这种情况下,我将使用System.Collections.Concurrent.ConcurrentQueue来跟踪任务。一旦队列为空,关闭计时器。
You could use Enumerable.Aggregate
instead.
var task = tasks.Aggregate((t1, t2) =>
t1.ContinueWith(async _ =>
{ Thread.Sleep(500); return t2.Result; }));
If you don't want the tasks chained then there is also the overload to Select
assuming the tasks are in order of delay.
var tasks = Enumerable
.Range(1, 10)
.Select(x => Task.Run(() => x * 2))
.Select((x, i) => Task.Delay(TimeSpan.FromMilliseconds(i * 500))
.ContinueWith(_ => x.Result));
foreach(var result in tasks.Select(x => x.Result))
{
Console.WriteLine(result);
}
From the comments a better options would be to guard the resource instead of using the time delay.
static object Locker = new object();
static int GetResultFromResource(int arg)
{
lock(Locker)
{
Thread.Sleep(500);
return arg * 2;
}
}
var tasks = Enumerable
.Range(1, 10)
.Select(x => Task.Run(() => GetResultFromResource(x)));
foreach(var result in tasks.Select(x => x.Result))
{
Console.WriteLine(result);
}
在这种情况下,带有 的 Producer-Consumer 模式怎么样BlockingCollection<T>
?
var tasks = new BlockingCollection<ITask>();
// add tasks, if this is an expensive process, put it out onto a Task
// tasks.Add(x);
// we're done producin' (allows GetConsumingEnumerable to finish)
tasks.CompleteAdding();
RunTasks(tasks);
使用单个消费者线程:
static void RunTasks(BlockingCollection<ITask> tasks)
{
foreach (var task in tasks.GetConsumingEnumerable())
{
task.Execute();
// this may not be as accurate as you would like
Thread.Sleep(500);
}
}
如果您有权访问 .Net 4.5,您可以使用Task.Delay
:
static void RunTasks(BlockingCollection<ITask> tasks)
{
foreach (var task in tasks.GetConsumingEnumerable())
{
Task.Delay(500)
.ContinueWith(() => task.Execute())
.Wait();
}
}