-2

我们有一个异步任务调度系统,它使用 golang 的 exec.Command 来执行 php 脚本。每次,调度程序从消息队列中获取一个任务,它会创建一个新的 exec.Command 来执行该任务。

有时,几乎有数千个任务需要同时执行。在这种情况下,调度程序将创建数千个 exec.Command,然后在一段时间后将它们销毁。

我想知道是否有办法创建一个进程池(或类似的东西),以便我们可以重用 exec.Command 来降低创建新子进程的成本。

ps:我注意到exec.Command在调用运行后不能重用。

UPATE 当前的逻辑如下:

 func nsqMessageHandler(msg *nsq.Message) error{

      var task Task
      err:= json.Unmarshal(msg.Body,&task)
      ....
      cmd:=exec.Command("php",task.Payload...)
      err:=cmd.Start()
      ....
      err= cmd.Run()
      err=cmd.Process.Kill()
      ...
 }
4

1 回答 1

1

绝对的。您可以创建一个将作业提交到的阻塞通道。然后,您可以通过生成将 job<-channel 作为输入并具有例如输出通道来发布进度或结果的goroutines来创建一组“workers” 。工人现在只是在频道上阅读,一旦其中一个找到工作,它就会工作。

你想防止工人一开始就关闭,所以你需要一些阻止它们的方法。解决此问题的一种方法是使用 awaitgroup并让工作人员在waitgroup其中一个关闭时将索引减一。此外,您想停止工作人员,但不能从外部停止 goroutine - 因此您必须实施一项工作,您可以将其传递给工作人员,迫使他们自己停止。

来自gobyexample.com的示例

// In this example we'll look at how to implement
// a _worker pool_ using goroutines and channels.

package main

import "fmt"
import "time"

// Here's the worker, of which we'll run several
// concurrent instances. These workers will receive
// work on the `jobs` channel and send the corresponding
// results on `results`. We'll sleep a second per job to
// simulate an expensive task.
func worker(id int, jobs <-chan int, results chan<- int) {
    for j := range jobs {
        fmt.Println("worker", id, "started  job", j)
        time.Sleep(time.Second)
        fmt.Println("worker", id, "finished job", j)
        results <- j * 2
    }
}

func main() {

    // In order to use our pool of workers we need to send
    // them work and collect their results. We make 2
    // channels for this.
    jobs := make(chan int, 100)
    results := make(chan int, 100)

    // This starts up 3 workers, initially blocked
    // because there are no jobs yet.
    for w := 1; w <= 3; w++ {
        go worker(w, jobs, results)
    }

    // Here we send 5 `jobs` and then `close` that
    // channel to indicate that's all the work we have.
    for j := 1; j <= 5; j++ {
        jobs <- j
    }
    close(jobs)

    // Finally we collect all the results of the work.
    for a := 1; a <= 5; a++ {
        <-results
    }
}
于 2018-08-16T09:43:21.777 回答