5

我编写了一个 API,可以进行 DB 调用并执行一些业务逻辑。我正在调用一个必须在后台执行某些操作的 goroutine。由于 API 调用不应该等待这个后台任务完成,所以我在调用 goroutine 后立即返回 200 OK(让我们假设后台任务永远不会给出任何错误。)

我读到 goroutine 将在 goroutine 完成其任务后终止。这种火灾和遗忘方式对 goroutine 泄漏安全吗?goroutine 在执行工作后是否会被终止并清理?

func DefaultHandler(w http.ResponseWriter, r *http.Request) {
    // Some DB calls
    // Some business logics
    go func() {
        // some Task taking 5 sec
    }()
    w.WriteHeader(http.StatusOK)
}
4

4 回答 4

4

您无需处理“goroutine 清理”,您只需启动 goroutine,当作为 goroutine 启动的函数返回时,它们将被清理。引用规范:Go 语句:

当函数终止时,它的 goroutine 也会终止。如果函数有任何返回值,它们会在函数完成时被丢弃。

所以你做的很好。但是请注意,您启动的 goroutine 不能使用或假设有关请求(r)和响应编写器(w)的任何内容,您只能在从处理程序返回之前使用它们。

另请注意,您不必编写http.StatusOK,如果您从处理程序返回而没有编写任何内容,则认为这是成功的,HTTP 200 OK并将自动发回。

查看相关/可能的重复项:Webhook 进程在另一个 goroutine 上运行

于 2021-07-13T09:21:21.543 回答
4

我建议始终控制你的 goroutine 以避免内存和系统耗尽。如果您收到大量请求并且开始不受控制地生成 goroutine,那么系统可能迟早会宕机。

在那些需要立即返回 200Ok 的情况下,最好的方法是创建一个消息队列,因此服务器只需要在队列中创建一个作业并返回 ok 并忘记。其余的将由消费者异步处理。

生产者(HTTP 服务器) >>> 队列 >>> 消费者

通常,队列是外部资源(RabbitMQ、AWS SQS...),但出于教学目的,您可以使用通道作为消息队列来实现相同的效果。

在示例中,您将看到我们如何创建一个通道来通信 2 个进程。然后我们启动将从通道读取的工作进程,然后使用将写入通道的处理程序启动服务器。

在发送 curl 请求时尝试使用缓冲区大小和工作时间。

package main

import (
    "fmt"
    "log"
    "net/http"
    "time"
)

/*
$ go run .

curl "http://localhost:8080?user_id=1"
curl "http://localhost:8080?user_id=2"
curl "http://localhost:8080?user_id=3"
curl "http://localhost:8080?user_id=....."

*/

func main() {

    queueSize := 10
    // This is our queue, a channel to communicate processes. Queue size is the number of items that can be stored in the channel
    myJobQueue := make(chan string, queueSize) // Search for 'buffered channels'

    // Starts a worker that will read continuously from our queue
    go myBackgroundWorker(myJobQueue)

    // We start our server with a handler that is receiving the queue to write to it
    if err := http.ListenAndServe("localhost:8080", myAsyncHandler(myJobQueue)); err != nil {
        panic(err)
    }
}

func myAsyncHandler(myJobQueue chan<- string) http.HandlerFunc {
    return func(rw http.ResponseWriter, r *http.Request) {
        // We check that in the query string we have a 'user_id' query param
        if userID := r.URL.Query().Get("user_id"); userID != "" {
            select {
            case myJobQueue <- userID: // We try to put the item into the queue ...
                rw.WriteHeader(http.StatusOK)
                rw.Write([]byte(fmt.Sprintf("queuing user process: %s", userID)))
            default: // If we cannot write to the queue it's because is full!
                rw.WriteHeader(http.StatusInternalServerError)
                rw.Write([]byte(`our internal queue is full, try it later`))
            }
            return
        }
        rw.WriteHeader(http.StatusBadRequest)
        rw.Write([]byte(`missing 'user_id' in query params`))
    }
}

func myBackgroundWorker(myJobQueue <-chan string) {
    const (
        jobDuration = 10 * time.Second // simulation of a heavy background process
    )

    // We continuosly read from our queue and process the queue 1 by 1.
    // In this loop we could spawn more goroutines in a controlled way to paralelize work and increase the read throughput, but i don't want to overcomplicate the example.
    for userID := range myJobQueue {
        // rate limiter here ...
        // go func(u string){
        log.Printf("processing user: %s, started", userID)
        time.Sleep(jobDuration)
        log.Printf("processing user: %s, finisehd", userID)
        // }(userID)
    }
}

于 2021-07-13T13:50:37.967 回答
2

@icza 是绝对正确的,没有“goroutine 清理”,您可以使用 webhook 或像gocraft这样的后台作业。我能想到的使用您的解决方案的唯一方法是将同步包用于学习目的。

func DefaultHandler(w http.ResponseWriter, r *http.Request) {
// Some DB calls
// Some business logics
var wg sync.WaitGroup
wg.Add(1)
go func() {
  defer wg.Done()
    // some Task taking 5 sec
}()
w.WriteHeader(http.StatusOK)
wg.wait()

}

于 2021-07-13T09:50:40.400 回答
1

您可以等待 goroutine 完成使用&sync.WaitGroup

// BusyTask
func BusyTask(t interface{}) error {
    var wg = &sync.WaitGroup{}

    wg.Add(1)
    go func() {
        // busy doing stuff
        time.Sleep(5 * time.Second)
        wg.Done()
    }()
    wg.Wait() // wait for goroutine

    return nil
}

// this will wait 5 second till goroutune finish
func main() {
    fmt.Println("hello")

    BusyTask("some task...")

    fmt.Println("done")
}

另一种方法是将 a 附加context.Context到 goroutine 并超时。

//
func BusyTaskContext(ctx context.Context, t string) error {
    done := make(chan struct{}, 1)
    //
    go func() {
        // time sleep 5 second
        time.Sleep(5 * time.Second)
        // do tasks and signle done
        done <- struct{}{}
        close(done)
    }()
    //
    select {
    case <-ctx.Done():
        return errors.New("timeout")
    case <-done:
        return nil
    }
}

//
func main() {
    fmt.Println("hello")

    ctx, cancel := context.WithTimeout(context.TODO(), 2*time.Second)
    defer cancel()

    if err := BusyTaskContext(ctx, "some task..."); err != nil {
        fmt.Println(err)
        return
    }

    fmt.Println("done")
}
于 2021-07-13T10:44:36.177 回答