我在.Net 中使用异步等待。如何限制并发异步调用的数量?
4 回答
一种相对简单的方法是(ab)使用 TPL 数据流。就像是:
public IEnumerable<TOutput> AsyncThrottle<TInput, TOutput>(
IEnumerable<TInput> inputs, Func<TInput, Task<TOutput>> asyncFunction,
int maxDegreeOfParallelism)
{
var outputs = new ConcurrentQueue<TOutput>();
var block = new ActionBlock<TInput>(
async x => outputs.Enqueue(await asyncFunction(x)),
new ExecutionDataflowBlockOptions
{ MaxDgreeOfParallelism = maxDegreeOfParallelism });
foreach (var input in inputs)
block.Send(input);
block.Complete();
block.Completion.Wait();
return outputs.ToArray();
}
注意:我把它留在这里作为遗产。不要这样做,因为WhenAny
同时等待的任务太多。堆栈会变深。
基于 Stephen Toub 的这段代码:
const int CONCURRENCY_LEVEL = 15;
Uri [] urls = …;
int nextIndex = 0;
var imageTasks = new List<Task<Bitmap>>();
while(nextIndex < CONCURRENCY_LEVEL && nextIndex < urls.Length)
{
imageTasks.Add(GetBitmapAsync(urls[nextIndex]));
nextIndex++;
}
while(imageTasks.Count > 0)
{
try
{
Task<Bitmap> imageTask = await Task.WhenAny(imageTasks);
imageTasks.Remove(imageTask);
Bitmap image = await imageTask;
panel.AddImage(image);
}
catch(Exception exc) { Log(exc); }
if (nextIndex < urls.Length)
{
imageTasks.Add(GetBitmapAsync(urls[nextIndex]));
nextIndex++;
}
}
我写了这个:
Private ThrottleGroups As New Dictionary(Of Object, List(Of Task))
Public Async Function ThrottleAsync(Of TResult)(ByVal f As Func(Of Task(Of TResult)), GroupId As Object, MaxCount As Integer) As Task(Of TResult)
If Not ThrottleGroups.ContainsKey(GroupId) Then
ThrottleGroups.Add(GroupId, New List(Of Task))
End If
If ThrottleGroups(GroupId).Count < MaxCount Then
Dim NewTask As Task(Of TResult) = f()
ThrottleGroups(GroupId).Add(NewTask)
Return Await NewTask
Else
Dim FinishedTask As Task = Await Task.WhenAny(ThrottleGroups(GroupId))
ThrottleGroups(GroupId).Remove(FinishedTask)
Return Await ThrottleAsync(f, GroupId, MaxCount)
End If
End Function
要使用,只需替换:
ExampleTaskAsync(param1, param2)
和:
Dim f As Func(Of Task(Of Integer))
f = Function()
Return ExampleAsync(param1, param2)
End Function
Const CONCURRENT_TASKS As Integer = 4
Return ThrottleAsync(f, "ExampleAsync", CONCURRENT_TASKS)
请注意,我们必须将对任务的调用包装在一个函数中f
,否则我们将已经启动任务。ThrottleAsync 的第二个参数是标识“组”的任何对象;我用了一个字符串。同一“组”中的所有异步任务都仅限于CONCURRENT_TASKS
任务,在本例中为 4。
下面的示例代码显示了一次只运行四个线程。 All Ready!
立即显示,因为子例程是异步的。此外,即使线程无序开始或结束,“输出”行仍将与输入的顺序相同。
Dim results As New List(Of Task(Of Integer))
For i As Integer = 0 To 20
Dim j As Integer = i
Dim f As Func(Of Task(Of Integer))
f = Function() As Task(Of Integer)
Return Task.Run(Function() As Integer
Debug.WriteLine(DateTime.Now & "Starting " & j)
System.Threading.Thread.Sleep(5000)
Debug.WriteLine(DateTime.Now & "Ending " & j)
Return j
End Function)
End Function
Const CONCURRENT_UPLOADS As Integer = 4
results.Add(ThrottleAsync(f, "PutOjbectAsync", CONCURRENT_UPLOADS))
Next
Debug.WriteLine("all ready!")
For Each x As Task(Of Integer) In results
Debug.WriteLine(DateTime.Now & "Output: " & Await x)
Next
根据代码,最简单的方法可能是使用 Parallel.For(Each) 并在并行选项中指定最大并行度。
我更喜欢这种技术。我TaskCompletionSource
用来为传入任务创建输出任务。Task
这是必要的,因为我什至想在运行它之前返回 a !下面的类将每个输入Func(of Task(of Object))
与TaskCompletionSource
立即返回的 a 相关联,并将它们放入队列中。
队列中的元素被出列到正在运行的任务列表中,并且继续设置TaskCompletionSource
. 对 in 循环的调用WhenAny
确保在空间释放时将元素从队列移动到运行列表。还有一个检查以确保一次不超过一个WhenAny
,尽管它可能存在并发问题。
要使用,只需像这样替换同步函数:
Task.Run(AddressOf MySyncFunction) 'possibly many of these
有了这个:
Dim t1 As New Throttler(4)
t1.Run(AddressOf MySyncFunction) 'many of these, but only 4 will run at a time.
对于已经返回 Task 的函数,重要的是要将它们转换为返回 Task 的函数,以便 tottler 可以运行它们。代替:
NewTask = MyFunctionAsync()
和:
NewTask = t1.Run(Function () return MyFunctionAsync())
下面的类还为 Throttler.Run() 实现了许多不同的签名,具体取决于函数是同步/异步、有/没有输入、有/没有输出。将任务转换为任务(输出)特别棘手!
Class Throttler
Property MaxCount As Integer
Sub New(Optional MaxCount As Integer = 1)
Me.MaxCount = MaxCount
End Sub
Private Running As New List(Of Task)
Private Waiting As New Concurrent.ConcurrentQueue(Of System.Tuple(Of Func(Of Task(Of Object)), TaskCompletionSource(Of Object)))
Private AlreadyWaiting As Boolean
Async Sub MakeWaiter()
If AlreadyWaiting Then Exit Sub
AlreadyWaiting = True
Do While Waiting.Count > 0
Dim CurrentWait As System.Tuple(Of Func(Of Task(Of Object)), TaskCompletionSource(Of Object)) = Nothing
Do While Running.Count < MaxCount AndAlso Waiting.TryDequeue(CurrentWait)
Dim NewFunc As Func(Of Task(Of Object)) = CurrentWait.Item1
Dim NewTask As Task(Of Object) = NewFunc()
Dim CurrentTcs As TaskCompletionSource(Of Object) = CurrentWait.Item2
NewTask.ContinueWith(Sub(t2 As Task(Of Object))
CurrentTcs.SetResult(t2.Result)
End Sub)
Running.Add(NewTask)
Loop
If Waiting.Count > 0 Then
Dim Waiter As Task(Of Task)
Waiter = Task.WhenAny(Running)
Dim FinishedTask As Task = Await Waiter
Await FinishedTask
Running.Remove(FinishedTask)
End If
Loop
AlreadyWaiting = False
End Sub
Function Run(f As Func(Of Task(Of Object))) As Task(Of Object)
Dim NewTcs As New TaskCompletionSource(Of Object)
Waiting.Enqueue(New System.Tuple(Of Func(Of Task(Of Object)), TaskCompletionSource(Of Object))(f, NewTcs))
MakeWaiter()
Return NewTcs.Task
End Function
Function Run(Of TInput)(f As Func(Of TInput, Task), input As TInput) As Task
Dim NewF As Func(Of Task)
NewF = Function() As Task
Return f(input)
End Function
Return Me.Run(NewF)
End Function
Function Run(Of TInput)(f As Func(Of TInput, Task(Of Object)), input As TInput) As Task(Of Object)
Dim NewF As Func(Of Task(Of Object))
NewF = Function() As Task(Of Object)
Return f(input)
End Function
Return CType(Me.Run(NewF), Task(Of Object))
End Function
Function Run(f As Func(Of Task)) As Task
Dim NewF As Func(Of Task(Of Object))
NewF = Function() As Task(Of Object)
Return f().ContinueWith(Function(t As task) As Object
Return Nothing
End Function)
End Function
Return CType(Me.Run(NewF), Task(Of Object))
End Function
Function Run(Of TInput)(f As Func(Of TInput, Object), input As TInput) As Task(Of Object)
Dim NewF As Func(Of Task(Of Object))
NewF = Function() As Task(Of Object)
Return Task.Run(Function() As Object
Return f(input)
End Function)
End Function
Return CType(Me.Run(NewF), Task(Of Object))
End Function
Function Run(Of TInput)(f As Action(Of TInput), input As TInput) As Task
Dim NewF As Func(Of Task)
NewF = Function() As Task
Return Task.Run(Sub()
f(input)
End Sub)
End Function
Return Me.Run(NewF)
End Function
Function Run(f As Func(Of Object)) As Task(Of Object)
Dim NewF As Func(Of Task(Of Object))
NewF = Function() As Task(Of Object)
Return Task.Run(Function()
Return f()
End Function)
End Function
Return CType(Me.Run(NewF), Task(Of Object))
End Function
Function Run(f As Action) As Task
Dim NewF As Func(Of Task)
NewF = Function() As Task
Return Task.Run(Sub()
f()
End Sub)
End Function
Return Me.Run(NewF)
End Function
End Class