70

TL;DR:请直接到最后一部分,告诉我你将如何解决这个问题。

今天早上我开始使用来自 Python 的 Go。我想用不同的命令行参数多次从 Go 调用一个封闭源代码的可执行文件,具有一点并发性。我生成的代码运行良好,但我想获得您的意见以改进它。由于我处于早期学习阶段,我还将解释我的工作流程。

为简单起见,这里假设这个“外部闭源程序”是zenity一个 Linux 命令行工具,可以从命令行显示图形消息框。

从 Go 调用可执行文件

所以,在 Go 中,我会这样:

package main
import "os/exec"
func main() {
    cmd := exec.Command("zenity", "--info", "--text='Hello World'")
    cmd.Run()
}

这应该工作得很好。请注意,.Run()它的功能等同.Start().Wait(). 这很好,但如果我只想执行这个程序一次,整个编程的东西就不值得了。所以让我们多次这样做。

多次调用可执行文件

现在我有了这个工作,我想多次调用我的程序,使用自定义命令行参数(这里只是i为了简单起见)。

package main    
import (
    "os/exec"
    "strconv"
)

func main() {
    NumEl := 8 // Number of times the external program is called
    for i:=0; i<NumEl; i++ {
        cmd := exec.Command("zenity", "--info", "--text='Hello from iteration n." + strconv.Itoa(i) + "'")
        cmd.Run()
    }
}

好的,我们做到了!但是我还是看不出 Go 相对于 Python 的优势……这段代码实际上是以串行方式执行的。我有一个多核 CPU,我想利用它。所以让我们用 goroutines 添加一些并发性。

Goroutines,或者一种使我的程序并行的方法

a)第一次尝试:只需在任何地方添加“go”

让我们重写我们的代码,使事情更容易调用和重用,并添加著名的go关键字:

package main
import (
    "os/exec"
    "strconv"
)

func main() {
    NumEl := 8 
    for i:=0; i<NumEl; i++ {
        go callProg(i)  // <--- There!
    }
}

func callProg(i int) {
    cmd := exec.Command("zenity", "--info", "--text='Hello from iteration n." + strconv.Itoa(i) + "'")
    cmd.Run()
}

没有什么!问题是什么?所有的 goroutines 都会立即执行。我真的不知道为什么没有执行zenity,但是AFAIK,Go程序在zenity外部程序甚至可以初始化之前就退出了。使用time.Sleep: 等待几秒钟就足以让 8 个 zenity 实例自行启动。我不知道这是否可以被认为是一个错误。

更糟糕的是,我实际上想要调用的真正程序需要一段时间才能执行。如果我在我的 4 核 CPU 上并行执行该程序的 8 个实例,会浪费一些时间进行大量上下文切换……我不知道普通 Go goroutine 的行为如何,但exec.Command 在 8 个不同的线程中启动 zenity 8 次. 更糟糕的是,我想执行这个程序超过 100,000 次。在 goroutines 中一次完成所有这些工作根本不会有效。不过,我想利用我的 4 核 CPU!

b) 第二次尝试:使用 goroutine 池

在线资源倾向于推荐sync.WaitGroup用于此类工作。这种方法的问题在于,您基本上是在处理一批 goroutine:如果我创建了 4 个成员的 WaitGroup,Go 程序将等待所有4 个外部程序完成,然后再调用新的一批 4 个程序。这效率不高:CPU 又一次被浪费了。

其他一些资源建议使用缓冲通道来完成这项工作:

package main
import (
    "os/exec"
    "strconv"
)

func main() {
    NumEl := 8               // Number of times the external program is called
    NumCore := 4             // Number of available cores
    c := make(chan bool, NumCore - 1) 
    for i:=0; i<NumEl; i++ {
        go callProg(i, c)
        c <- true            // At the NumCoreth iteration, c is blocking   
    }
}

func callProg(i int, c chan bool) {
    defer func () {<- c}()
    cmd := exec.Command("zenity", "--info", "--text='Hello from iteration n." + strconv.Itoa(i) + "'")
    cmd.Run()
}

这看起来很丑陋。频道不是为此目的而设计的:我正在利用副作用。我喜欢这个概念,defer但我讨厌必须声明一个函数(甚至是一个 lambda)才能从我创建的虚拟通道中弹出一个值。哦,当然,使用虚拟通道本身就是丑陋的。

c) 第三次尝试:当所有孩子都死了时死

现在我们快完成了。我只需要考虑另一个副作用:Go 程序在所有 zenity 弹出窗口关闭之前关闭。这是因为当循环结束时(第 8 次迭代),没有什么能阻止程序结束。这一次,sync.WaitGroup将是有用的。

package main
import (
    "os/exec"
    "strconv"
    "sync"
)

func main() {
    NumEl := 8               // Number of times the external program is called
    NumCore := 4             // Number of available cores
    c := make(chan bool, NumCore - 1) 
    wg := new(sync.WaitGroup)
    wg.Add(NumEl)            // Set the number of goroutines to (0 + NumEl)
    for i:=0; i<NumEl; i++ {
        go callProg(i, c, wg)
        c <- true            // At the NumCoreth iteration, c is blocking   
    }
    wg.Wait() // Wait for all the children to die
    close(c)
}

func callProg(i int, c chan bool, wg *sync.WaitGroup) {
    defer func () {
        <- c
        wg.Done() // Decrease the number of alive goroutines
    }()
    cmd := exec.Command("zenity", "--info", "--text='Hello from iteration n." + strconv.Itoa(i) + "'")
    cmd.Run()
}

完毕。

我的问题

  • 你知道任何其他适当的方法来限制一次执行的 goroutines 的数量吗?

我不是指线程;Go 内部如何管理 goroutine 无关紧要。我的意思是限制一次启动的 goroutine 的数量:exec.Command每次调用时都会创建一个新线程,所以我应该控制它被调用的次数。

  • 这段代码对你来说看起来不错吗?
  • 您知道在这种情况下如何避免使用虚拟通道吗?

我无法说服自己这样的虚拟通道是要走的路。

4

5 回答 5

89

我会生成 4 个从公共通道读取任务的工作 goroutine。比其他人更快的 Goroutines(因为它们的调度方式不同或碰巧得到简单的任务)将从这个通道接收到比其他人更多的任务。除此之外,我会使用一个sync.WaitGroup来等待所有工作人员完成。剩下的部分只是任务的创建。您可以在此处查看该方法的示例实现:

package main

import (
    "os/exec"
    "strconv"
    "sync"
)

func main() {
    tasks := make(chan *exec.Cmd, 64)

    // spawn four worker goroutines
    var wg sync.WaitGroup
    for i := 0; i < 4; i++ {
        wg.Add(1)
        go func() {
            for cmd := range tasks {
                cmd.Run()
            }
            wg.Done()
        }()
    }

    // generate some tasks
    for i := 0; i < 10; i++ {
        tasks <- exec.Command("zenity", "--info", "--text='Hello from iteration n."+strconv.Itoa(i)+"'")
    }
    close(tasks)

    // wait for the workers to finish
    wg.Wait()
}

可能还有其他可能的方法,但我认为这是一个非常干净且易于理解的解决方案。

于 2013-08-23T15:39:07.423 回答
34

一种简单的节流方法(执行f()N 次,但最多maxConcurrency同时执行),只是一个方案:

package main

import (
        "sync"
)

const maxConcurrency = 4 // for example

var throttle = make(chan int, maxConcurrency)

func main() {
        const N = 100 // for example
        var wg sync.WaitGroup
        for i := 0; i < N; i++ {
                throttle <- 1 // whatever number
                wg.Add(1)
                go f(i, &wg, throttle)
        }
        wg.Wait()
}

func f(i int, wg *sync.WaitGroup, throttle chan int) {
        defer wg.Done()
        // whatever processing
        println(i)
        <-throttle
}

操场

我可能不会称throttle频道为“虚拟”。恕我直言,这是一种优雅的方式(当然这不是我的发明),如何限制并发。

顺便说一句:请注意,您忽略了从cmd.Run().

于 2013-08-23T14:33:57.003 回答
2

试试这个: https ://github.com/korovkin/limiter

 limiter := NewConcurrencyLimiter(10)
 limiter.Execute(func() {
        zenity(...) 
 })
 limiter.Wait()
于 2017-05-13T18:39:32.533 回答
1

模块


模板

package main

import (
    "fmt"
    "github.com/zenthangplus/goccm"
    "math/rand"
    "runtime"
)

func main() {
    semaphore := goccm.New(runtime.NumCPU())
    
    for {
        semaphore.Wait()

        go func() {
            fmt.Println(rand.Int())
            semaphore.Done()
        }()
    }
    
    semaphore.WaitAllDone()
}

最佳常规数量

  • 如果操作受 CPU 限制:runtime.NumCPU()
  • 否则测试: time go run *.go

配置

export GOPATH="$(pwd)/gopath"
go mod init *.go
go mod tidy

清理

find "${GOPATH}" -exec chmod +w {} \;
rm --recursive --force "${GOPATH}"

于 2021-05-18T07:51:54.637 回答
0

您可以使用本文中描述的 Worker Pool 模式。这就是实现的样子......

package main
import (
    "os/exec"
    "strconv"
)

func main() {
    NumEl := 8 
    pool := 4
    intChan := make(chan int)


    for i:=0; i<pool; i++ {
        go callProg(intChan)  // <--- launch the worker routines
    }

    for i:=0;i<NumEl;i++{
        intChan <- i        // <--- push data which will be received by workers
    }

    close(intChan) // <--- will safely close the channel & terminate worker routines
}

func callProg(intChan chan int) {
    for i := range intChan{
        cmd := exec.Command("zenity", "--info", "--text='Hello from iteration n." + strconv.Itoa(i) + "'")
        cmd.Run()
    }
}
于 2021-07-23T06:19:56.443 回答