我已经为您制作了一个非常通用的功能,并且与其他答案不同,您无需重新调整代码即可使其正常工作。
只需将您的函数作为参数传递给Async
并通过管道输入,管道上的每个项目都将scriptblock
异步并行运行,并在每个项目完成时发出它们。
对于你的问题,它看起来像这样
@(
@{vmxFilePath='a';machineName='b';username='c';password='d';scriptTpath='e';scriptFile='f';uacDismissScript='g';snapshotName'h'},
@{vmxFilePath='i';machineName='j';username='k';password='l';scriptTpath='m';scriptFile='n';uacDismissScript='o';snapshotName'p'}
...
) `
| Async `
-Func { Process {
Execute $_.vmxFilePath $_.machineName $_.username $_.password $_.scriptTpath $_.scriptFile $_.uacDismissScript $_.snapshotName
} }
最重要的是,我的函数不仅支持[powershell]
(@binarySalt 的答案)的自动构造,而且还支持Job
s(在 @Joost 中使用,但不要使用它们,因为它们比运行空间慢得多)和Task
s 如果你正在使用其他人的代码已经产生它们(使用-AsJob
标志,我在这个答案的底部解释)。
所以,这对这个问题的新访问者没有用,让我们做一些更可证明的事情,您可以在您的机器上运行并查看真实世界的结果。
以这个简单的代码为例,它只接收一些网站的测试数据并检查它们是否启动。
$in=TestData | ?{ $_.proto -eq 'tcp' }
$in `
| %{
$WarningPreference='SilentlyContinue'
$_ `
| Add-Member `
-PassThru `
-MemberType NoteProperty `
-Name result `
-Value $(Test-NetConnection `
-ComputerName $_.address `
-Port $_.port `
-InformationLevel Quiet
)
} `
| Timer -name 'normal' `
| Format-Table
这是测试数据,只是重复了几个相同的网站。
还有一个计时功能,看看它的性能如何。
Function TestData {
1..20 | %{
[PsCustomObject]@{proto='tcp' ; address='www.w3.org' ; port=443},
[PsCustomObject]@{proto='https'; address='www.w3.org' ; port=443},
[PsCustomObject]@{proto='icmp' ; address='www.w3.org' ; },
[PsCustomObject]@{proto='tcp' ; address='developer.mozilla.org' ; port=443},
[PsCustomObject]@{proto='https'; address='developer.mozilla.org' ; port=443},
[PsCustomObject]@{proto='icmp' ; address='developer.mozilla.org' ; },
[PsCustomObject]@{proto='tcp' ; address='help.dottoro.com' ; port=80 },
[PsCustomObject]@{proto='http' ; address='help.dottoro.com' ; port=80 },
[PsCustomObject]@{proto='icmp' ; address='help.dottoro.com' ; }
}
}
Function Timer {
Param ($name)
Begin {
$timer=[system.diagnostics.stopwatch]::StartNew()
}
Process { $_ }
End {
@(
$name,
' '
[math]::Floor($timer.Elapsed.TotalMinutes),
':',
($timer.Elapsed.Seconds -replace '^(.)$','0$1')
) -join '' | Out-Host
}
}
好的,15 秒,那么如果我们使用 ,这能快多少Async
呢?
我们需要改变多少才能让它发挥作用?
$in=TestData | ?{ $_.proto -eq 'tcp' }
$in `
| Async `
-Expected $in.Count `
-Func { Process {
$WarningPreference='SilentlyContinue'
$_ `
| Add-Member `
-PassThru `
-MemberType NoteProperty `
-Name result `
-Value $(Test-NetConnection `
-ComputerName $_.address `
-Port $_.port `
-InformationLevel Quiet
)
} } `
| Timer -name 'async' `
| Format-Table
它看起来基本相同..
好的,速度是多少?
哇,减三分之二!
不仅如此,因为我们知道管道中有多少项目,我写了一些聪明的东西给你一个进度条和一个 ETA
不相信我?有视频
或自己运行代码:)
#Requires -Version 5.1
#asynchronously run a pool of tasks,
#and aggregate the results back into a synchronous output
#without waiting to pool all input before seeing the first result
Function Async { Param(
#maximum permitted simultaneous background tasks
[int]$BatchSize=[int]$env:NUMBER_OF_PROCESSORS * 3,
#the task that accepts input on a pipe to execute in the background
[scriptblock]$Func,
#because your task is in a subshell you wont have access to your outer scope,
#you may pass them in here
[array]$ArgumentList=@(),
[System.Collections.IDictionary]$Parameters=@{},
#the title of the progress bar
[string]$Name='Processing',
#your -Func may return a [Job] instead of being backgrounded itself,
#if so it must return @(job;input;args)
#optionally job may be a [scriptblock] to be backgrounded, or a [Task]
[switch]$AsJob,
#if you know the number of tasks ahead of time,
#providing it here will have the progress bar show an ETA
[int]$Expected,
#outputs of this stream will be @(job;input) where job is the result
[switch]$PassThru,
#the time it takes to give up on one job type if there are others waiting
[int]$Retry=5
)
Begin {
$ArgumentList=[Array]::AsReadOnly($ArgumentList)
$Parameters=$Parameters.GetEnumerator() `
| &{
Begin { $params=[ordered]@{} }
Process { $params.Add($_.Key, $_.Value) }
End { $params.AsReadOnly() }
}
#the currently running background tasks
$running=@{}
$counts=[PSCustomObject]@{
completed=0;
jobs=0;
tasks=0;
results=0;
}
#a lazy attempt at uniquely IDing this instance for Write-Progress
$asyncId=Get-Random
#a timer for Write-Progress
$timer=[system.diagnostics.stopwatch]::StartNew()
$pool=[RunspaceFactory]::CreateRunspacePool(1, $BatchSize)
$pool.Open()
#called whenever we want to update the progress bar
Function Progress { Param($Reason)
#calculate ETA if applicable
$eta=-1
$total=[math]::Max(1, $counts.completed + $running.Count)
if ($Expected) {
$total=[math]::Max($total, $Expected)
if ($counts.completed) {
$eta=`
($total - $counts.completed) * `
$timer.Elapsed.TotalSeconds / `
$counts.completed
}
}
$Reason=Switch -regex ($Reason) {
'^done$' { "Finishing up the final $($running.Count) jobs." }
'^(do|next)$' { "
Running
$($running.Count)
jobs concurrently.
$(@('Adding','Waiting to add')[!($Reason -eq 'do')])
job #
$($counts.completed + $running.Count + 1)
" -replace '\r?\n\t*','' }
Default { "
Running $($running.Count) jobs concurrently.
Emitting
$($counts.completed)
$(@{1='st';2='nd';3='rd'}[$counts.completed % 10] -replace '^$','th')
result.
" -replace '\r?\n\t*','' }
}
Write-Progress `
-Id $asyncId `
-Activity $Name `
-SecondsRemaining $eta `
-Status ("
$($counts.completed)
jobs completed in
$([math]::Floor($timer.Elapsed.TotalMinutes))
:
$($timer.Elapsed.Seconds -replace '^(.)$','0$1')
" -replace '\r?\n\t*','') `
-CurrentOperation $Reason `
-PercentComplete (100 * $counts.completed / $total)
}
#called with the [Job]'s that have completed
Filter Done {
++$counts.completed
$out=$running.Item($_.Id)
$running.Remove($_.Id)
Progress
$out.job=`
if ($_ -is [System.Management.Automation.Job]) {
--$counts.jobs
$_ | Receive-Job
}
elseif ($_.pwsh) {
--$counts.results
try {
$_.pwsh.EndInvoke($_)
}
catch {
#[System.Management.Automation.MethodInvocationException]
$_.Exception.InnerException
}
finally {
$_.pwsh.Dispose()
}
}
elseif ($_.IsFaulted) {
--$counts.tasks
#[System.AggregateException]
$_.Exception.InnerException
}
else {
--$counts.tasks
$_.Result
}
if ($PassThru) {
$out
}
else {
$out.job
}
}
$isJob={
$_ -is [System.Management.Automation.Job]
}
$isTask={
$_ -is [System.Threading.Tasks.Task]
}
$isResult={
$_ -is [IAsyncResult]
}
$isFinished={
$_.IsCompleted -or `
(
$_.JobStateInfo.State -gt 1 -and
$_.JobStateInfo.State -ne 6 -and
$_.JobStateInfo.State -ne 8
)
}
$handle={
$_.AsyncWaitHandle
}
Function Jobs { Param($Filter)
$running.Values | %{ $_.job } | ? $Filter
}
#called whenever we need to wait for at least one task to completed
#outputs the completed tasks
Function Wait { Param([switch]$Finishing)
#if we are at the max background tasks this instant
while ($running.Count -ge $BatchSize) {
Progress -Reason @('done','next')[!$Finishing]
$value=@('jobs', 'tasks', 'results') `
| %{ $counts.($_) } `
| measure -Maximum -Sum
$wait=if ($value.Maximum -lt $value.Sum) {
$Retry
}
else {
-1
}
$value=Switch -exact ($value.Maximum) {
$counts.jobs {
(Wait-Job `
-Any `
-Job (Jobs -Filter $isJob) `
-Timeout $wait
).Count -lt 1
break
}
Default {
[System.Threading.WaitHandle]::WaitAny(
(Jobs -Filter $handle | % $handle),
[math]::Max($wait * 1000, -1)
) -eq [System.Threading.WaitHandle]::WaitTimeout
break
}
}
(Jobs -Filter $isFinished) | Done
}
}
}
#accepts inputs to spawn a new background task with
Process {
Wait
Progress -Reason 'do'
$run=[PSCustomObject]@{
input=$_;
job=$Func;
args=$ArgumentList;
params=$Parameters;
}
if ($AsJob) {
$run.job=$NULL
Invoke-Command `
-ScriptBlock $Func `
-ArgumentList @($run) `
| Out-Null
}
if ($run.job | % $isJob) {
++$counts.jobs
}
elseif ($run.job | % $isTask) {
++$counts.tasks
}
#if we weren't given a [Job] we need to spawn it for them
elseif ($run.job -is [ScriptBlock]) {
$pwsh=[powershell]::Create().AddScript($run.job)
$run.args | %{ $pwsh.AddArgument($_) } | Out-Null
$pwsh.RunspacePool=$pool
$run.job=$pwsh.AddParameters($run.params).BeginInvoke(
[System.Management.Automation.PSDataCollection[PSObject]]::new(
[PSObject[]]($run.input)
)
)
$run.job | Add-Member `
-MemberType NoteProperty `
-Name pwsh `
-Value $pwsh `
-PassThru `
| Add-Member `
-MemberType NoteProperty `
-Name Id `
-Value $run.job.AsyncWaitHandle.Handle.ToString()
++$counts.results
}
else {
throw "$($run.job.GetType()) needs to be a ScriptBlock"
}
$running.Add($run.job.Id, $run) | Out-Null
}
End {
#wait for the remaining running processes
$BatchSize=1
Wait -Finishing
Write-Progress -Id $asyncId -Activity $Name -Completed
$pool.Close()
$pool.Dispose()
}
}
所以你可能已经注意到上面的三件事,我提到-AsJob
(使用Job
s、Task
s 和scriptblock
s 的混合),测试数据中提到了未使用的协议,并且视频中有第三个测试。
这里是。除了进行基本的 tcp 测试,我们还将使用测试数据进行 http/s 检查和 icmp ping(idk,也许 https 失败,但如果是因为机器停机或只是服务,您想缩小范围)。
$in=TestData
$in `
| Async `
-Expected $in.Count `
-PassThru `
-AsJob `
<#this would be accessible as a named parameter if needed#>`
-Parameters @{proxy=[System.Net.WebRequest]::GetSystemWebProxy()} `
-Func { Param([parameter(Position=0)]$x)
$x.job=Switch -regex ($x.input.proto) {
'^icmp$' {
Test-Connection `
-ComputerName $x.input.address `
-Count 1 `
-ThrottleLimit 1 `
-AsJob
}
'^tcp$' {
$x.params=@{address=$x.input.address; port=$x.input.port}
{ Param($address, $port)
$WarningPreference='SilentlyContinue'
Test-NetConnection `
-ComputerName $address `
-Port $port `
-InformationLevel Quiet
}
}
'^(http|https)$' {
[Net.ServicePointManager]::SecurityProtocol=[Net.SecurityProtocolType]::Tls12
$request=[System.Net.HttpWebRequest]::Create((@(
$x.input.proto,
'://',
$x.input.address,
':',
$x.input.port
) -join ''))
$request.Proxy=$NULL
$request.Method='Get'
$request.GetResponseAsync()
}
}
} `
| %{
$result=$_
$result.input `
| Add-Member `
-PassThru `
-MemberType NoteProperty `
-Name result `
-Value $(Switch -regex (@($result.input.proto, $result.job.message)[$result.job -is [Exception]]) {
#[Win32_PingStatus]
'^icmp$' { $result.job.StatusCode -eq 0 }
#[bool]
'^tcp$' { $result.job }
#[System.Net.HttpWebResponse]
'^(http|https)$' {
$result.job.Close()
Switch ($result.job.StatusCode.value__) {
{ $_ -ge 200 -and $_ -lt 400 } { $True }
Default {$False}
}
}
#[Exception]
Default { $False }
})
} `
| Timer -name 'async asjob' `
| Format-Table
正如您可能已经看到的那样,这使原始代码的工作量增加了一倍以上,但仍然在大约一半的时间内完成了 8 秒。