我在 RabbitMQ 中有一个消费者-生产者方式的队列,它可以作为基本的循环队列正常工作。
我的问题是我试图限制每秒处理的请求数,因为当我使一个项目出队时,我向一个 DO 空间发出请求,如果我在一秒钟内发出 750 个或更多请求,它将阻止我的 IP。我使用 goroutines 来同时出列项目,但我只想每秒一次出列 500 个项目以避免达到该限制。这需要考虑当前正在出队的项目(即我不能只从队列中拉出 500 个项目然后延迟到下一秒),基本上在它运行出队代码之前,它需要等待以确保有在那一秒内出列的请求还没有超过 500 个。到目前为止我有这段代码,但它似乎不能正常工作(注意我正在测试每秒 2 个请求,而不是现在的 500 个)。它每隔一段时间就会有很长的延迟(比如 20 多秒),我不确定它是否正确计算了限制。请注意,我很确定 prefetch 选项不是我在这里需要的,因为它限制了每秒传入的消息数量,这里我只想限制每秒同时出队的消息。
import (
"os"
"fmt"
"github.com/streadway/amqp"
"golang.org/x/time/rate"
"context"
)
// Rate-limit => 2 req/s
const (
workers = 2
)
func failOnErrorWorker(err error, msg string) {
if err != nil {
fmt.Println(msg)
fmt.Println(err)
}
}
func main() {
// Get the env variables for the queue name and connection string
queueName := os.Getenv("QUEUE_NAME")
connectionString := os.Getenv("CONNECTION_STRING")
// Set up rate limiter and context
limiter := rate.NewLimiter(2, 1)
ctx := context.Background()
// Connect to the rabbitmq instance
conn, err := amqp.Dial(connectionString)
failOnErrorWorker(err, "Failed to connect to RabbitMQ")
defer conn.Close()
// Open a channel for the queue
ch, err := conn.Channel()
failOnErrorWorker(err, "Failed to open a channel")
defer ch.Close()
// Consume the messages from this queue
msgs, err := ch.Consume(
queueName, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnErrorWorker(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
// Wait until there are less than 2 workers per second
limiter.Wait(ctx)
go func() {
// Dequeue the item and acknowledge the message
DeQueue(d.Body)
d.Ack(false)
} ()
}
}()
fmt.Println(" [*] Waiting for messages. To exit press CTRL+C")
// Continually run the worker
<-forever
}