我建议始终控制你的 goroutine 以避免内存和系统耗尽。如果您收到大量请求并且开始不受控制地生成 goroutine,那么系统可能迟早会宕机。
在那些需要立即返回 200Ok 的情况下,最好的方法是创建一个消息队列,因此服务器只需要在队列中创建一个作业并返回 ok 并忘记。其余的将由消费者异步处理。
生产者(HTTP 服务器) >>> 队列 >>> 消费者
通常,队列是外部资源(RabbitMQ、AWS SQS...),但出于教学目的,您可以使用通道作为消息队列来实现相同的效果。
在示例中,您将看到我们如何创建一个通道来通信 2 个进程。然后我们启动将从通道读取的工作进程,然后使用将写入通道的处理程序启动服务器。
在发送 curl 请求时尝试使用缓冲区大小和工作时间。
package main
import (
"fmt"
"log"
"net/http"
"time"
)
/*
$ go run .
curl "http://localhost:8080?user_id=1"
curl "http://localhost:8080?user_id=2"
curl "http://localhost:8080?user_id=3"
curl "http://localhost:8080?user_id=....."
*/
func main() {
queueSize := 10
// This is our queue, a channel to communicate processes. Queue size is the number of items that can be stored in the channel
myJobQueue := make(chan string, queueSize) // Search for 'buffered channels'
// Starts a worker that will read continuously from our queue
go myBackgroundWorker(myJobQueue)
// We start our server with a handler that is receiving the queue to write to it
if err := http.ListenAndServe("localhost:8080", myAsyncHandler(myJobQueue)); err != nil {
panic(err)
}
}
func myAsyncHandler(myJobQueue chan<- string) http.HandlerFunc {
return func(rw http.ResponseWriter, r *http.Request) {
// We check that in the query string we have a 'user_id' query param
if userID := r.URL.Query().Get("user_id"); userID != "" {
select {
case myJobQueue <- userID: // We try to put the item into the queue ...
rw.WriteHeader(http.StatusOK)
rw.Write([]byte(fmt.Sprintf("queuing user process: %s", userID)))
default: // If we cannot write to the queue it's because is full!
rw.WriteHeader(http.StatusInternalServerError)
rw.Write([]byte(`our internal queue is full, try it later`))
}
return
}
rw.WriteHeader(http.StatusBadRequest)
rw.Write([]byte(`missing 'user_id' in query params`))
}
}
func myBackgroundWorker(myJobQueue <-chan string) {
const (
jobDuration = 10 * time.Second // simulation of a heavy background process
)
// We continuosly read from our queue and process the queue 1 by 1.
// In this loop we could spawn more goroutines in a controlled way to paralelize work and increase the read throughput, but i don't want to overcomplicate the example.
for userID := range myJobQueue {
// rate limiter here ...
// go func(u string){
log.Printf("processing user: %s, started", userID)
time.Sleep(jobDuration)
log.Printf("processing user: %s, finisehd", userID)
// }(userID)
}
}