0

我正在尝试在 Go 中设计一个 HTTP 客户端,该客户端能够对 Web 服务进行并发 API 调用并将一些数据写入文本文件。

func getTotalCalls() int {
    reader := bufio.NewReader(os.Stdin)
    ...
    return callInt
}

getTotalColls决定我要打多少电话,输入来自终端。

func writeToFile(s string, namePrefix string) {
    fileStore := fmt.Sprintf("./data/%s_calls.log", namePrefix)
    ...
    defer f.Close()
    if _, err := f.WriteString(s); err != nil {
        log.Println(err)
    }
}

将从缓冲通道同步将writeToFile数据写入文件。

func makeRequest(url string, ch chan<- string, id int) {
    var jsonStr = []byte(`{"from": "Saru", "message": "Saru to Discovery. Over!"}`)
    req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonStr))
    req.Header.Set("Content-Type", "application/json")
    client := &http.Client{}
    start := time.Now()
    resp, err := client.Do(req)
    if err != nil {
        panic(err)
    }
    secs := time.Since(start).Seconds()
    defer resp.Body.Close()
    body, _ := ioutil.ReadAll(resp.Body)
    ch <- fmt.Sprintf("%d, %.2f, %d, %s, %s\n", id, secs, len(body), url, body)
}

这是在 go Routine 中进行 API 调用的函数。

最后这是 Main 函数,它将数据从 go 例程发送到 bufferend 通道,然后我跨越 string 的 bufferend 通道并将数据写入文件。

func main() {
    urlPrefix := os.Getenv("STARCOMM_GO")
    url := urlPrefix + "discovery"
    totalCalls := getTotalCalls()
    queue := make(chan string, totalCalls)

    for i := 1; i <= totalCalls; i++ {
        go makeRequest(url, queue, i)
    }

    for item := range queue {
        fmt.Println(item)
        writeToFile(item, fmt.Sprint(totalCalls))
    }
}

问题是在调用结束时缓冲的阻塞和程序永远等待所有调用的结束。有人有更好的方法来设计这样的用例吗?我的最终目标是检查不同数量的并发发布请求,每次调用需要多少时间才能将 API 端点标记为 5、10、50、100、500、1000 ... 组并发调用。

4

2 回答 2

2

有些事必须close(queue)。否则range queue会阻塞。如果您愿意range queue,您必须确保在最终客户端完成后关闭此通道。

但是...尽管您是否需要这样做甚至都不清楚range queue,因为您确切知道将获得多少结果-它是totalCalls. 您只需要循环多次接收 from queue

我相信您的用例类似于gobyexample 上的 Worker Pools 示例,因此您可能需要检查一下。这是该示例中的代码:

// In this example we'll look at how to implement
// a _worker pool_ using goroutines and channels.

package main

import (
    "fmt"
    "time"
)

// Here's the worker, of which we'll run several
// concurrent instances. These workers will receive
// work on the `jobs` channel and send the corresponding
// results on `results`. We'll sleep a second per job to
// simulate an expensive task.
func worker(id int, jobs <-chan int, results chan<- int) {
    for j := range jobs {
        fmt.Println("worker", id, "started  job", j)
        time.Sleep(time.Second)
        fmt.Println("worker", id, "finished job", j)
        results <- j * 2
    }
}

func main() {

    // In order to use our pool of workers we need to send
    // them work and collect their results. We make 2
    // channels for this.
    const numJobs = 5
    jobs := make(chan int, numJobs)
    results := make(chan int, numJobs)

    // This starts up 3 workers, initially blocked
    // because there are no jobs yet.
    for w := 1; w <= 3; w++ {
        go worker(w, jobs, results)
    }

    // Here we send 5 `jobs` and then `close` that
    // channel to indicate that's all the work we have.
    for j := 1; j <= numJobs; j++ {
        jobs <- j
    }
    close(jobs)

    // Finally we collect all the results of the work.
    // This also ensures that the worker goroutines have
    // finished. An alternative way to wait for multiple
    // goroutines is to use a [WaitGroup](waitgroups).
    for a := 1; a <= numJobs; a++ {
        <-results
    }
}

您的“工人”发出 HTTP 请求,否则它几乎是相同的模式。请注意for末尾的循环,它从通道读取已知次数。

于 2021-02-14T16:43:24.737 回答
1

如果您需要限制同时请求的数量,您可以使用通过缓冲通道实现的信号量。

func makeRequest(url string, id int) string {
    var jsonStr = []byte(`{"from": "Saru", "message": "Saru to Discovery. Over!"}`)
    req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonStr))
    req.Header.Set("Content-Type", "application/json")
    client := &http.Client{}
    start := time.Now()
    resp, err := client.Do(req)
    if err != nil {
        panic(err)
    }
    secs := time.Since(start).Seconds()
    defer resp.Body.Close()
    body, _ := ioutil.ReadAll(resp.Body)
    return fmt.Sprintf("%d, %.2f, %d, %s, %s\n", id, secs, len(body), url, body)
}

func main() {
    urlPrefix := os.Getenv("STARCOMM_GO")
    url := urlPrefix + "discovery"
    totalCalls := getTotalCalls()
    concurrencyLimit := 50 // 5, 10, 50, 100, 500, 1000.

    // Declare semaphore as a buffered channel with capacity limited by concurrency level.
    semaphore := make(chan struct{}, concurrencyLimit)

    for i := 1; i <= totalCalls; i++ {
        // Take a slot in semaphore before proceeding.
        // Once all slots are taken this call will block until slot is freed.
        semaphore <- struct{}{}
        go func() {
            // Release slot on job finish.
            defer func() { <-semaphore }()
            item := makeRequest(url, i)
            fmt.Println(item)
            // Beware that writeToFile will be called concurrently and may need some synchronization.
            writeToFile(item, fmt.Sprint(totalCalls)) 
        }()
    }

    // Wait for jobs to finish by filling semaphore to full capacity.
    for i := 0; i < cap(semaphore); i++ {
        semaphore <- struct{}{}
    }
    close(semaphore)
}
于 2021-02-14T22:50:22.037 回答