2

我很难理解 goroutines、channels 和所有同步的东西。我相信我理解这些概念,但我缺少几行来连接我拥有的所有信息。此外,大多数示例都感觉​​太简单了,因此我无法正确掌握实际发生的情况。

我正在为网站编写一个简单的分析工具。其中一项功能是检查是否可以访问本网站上的所有链接。显然,每个网站上都有很多链接,所以它似乎是一个很好的 goroutines 候选者。问题是,在安排了所有 goroutine 之后,我需要取回所有结果,以便一次将它们全部呈现给用户。

到目前为止,我所拥有的是:

func links(u *url.URL, d *goquery.Document) (links []models.Link) {
    wg := sync.WaitGroup{}

    d.Find("a[href]").Each(func(index int, item *goquery.Selection) {
        go func() {
            wg.Add(1)
            href, _ := item.Attr("href")
            url, _ := url.Parse(href)
            var internal bool

            if url.Host == "" {
                url.Scheme = u.Scheme
                url.Host = u.Host
            }

            links = append(links, models.Link{
                URL:       url,
                Reachable: Reachable(url.String()),
            })

            wg.Done()
        }()
    })

    wg.Wait()

    return
}

func Reachable(u string) bool {
    res, err := http.Head(u)
    if err != nil {
        return false
    }

    return res.StatusCode == 200
}

我的代码似乎可以工作,但我觉得我错过了一些东西(或者至少它可能会更好)。我有几个担忧/问题:

  1. 如果网站包含 1000 个链接,我会生成 1000 个 goroutine,我相信它并不那么聪明。可能我需要一个工作池或类似的东西,对吧?
  2. 是否可以仅在此示例中使用通道?我不知道goquery会找到多少个链接,所以我不能轻易地将range元素发送到频道。另外,我不能轻易地向done另一个频道发送一些消息,因为我不知道什么时候Each会结束。此通道上的每个for range通道都处于阻塞状态,因此应用程序正在恢复同步。
  3. 我相信这在应用程序中很常见,您开始迭代某些东西,并且您希望在每次迭代中做一些异步工作,并在迭代结束时收集所有结果。我无法理解这个概念。我想不出如何处理这种情况。
4

1 回答 1

2

您可以使用信号量来限制并发性。这仍然会产生“1000 个 goroutines”,但确保在给定时间只有 5 个 http 请求在运行。您可以更改 的值maxParallel以增加或减少并行请求的数量。

func links(u *url.URL, d *goquery.Document) (links []models.Link) {
    wg := sync.WaitGroup{}
    linkChan := make(chan models.Link)
    doneChan := make(chan struct{})
    maxParallel := 5
    semaphore := make(chan struct{}, maxParallel)
    d.Find("a[href]").Each(func(index int, item *goquery.Selection) {
        wg.Add(1)
        go func() {
            semaphore <- struct{}{}
            href, _ := item.Attr("href")
            url, _ := url.Parse(href)

            if url.Host == "" {
                url.Scheme = u.Scheme
                url.Host = u.Host
            }
            linkChan <- models.Link{
                URL:       url,
                Reachable: Reachable(url.String()),
            }
            wg.Done()
            <-semaphore
        }()
    })
    go func() {
        wg.Wait()
        doneChan <- struct{}{}
    }()

    // Drain the channel
    for {
        select {
        case l := <-linkChan:
            links = append(links, l)
        case <-doneChan:
            return
        }
    }
    return
}

func Reachable(u string) bool {
    res, err := http.Head(u)
    if err != nil {
        return false
    }

    return res.StatusCode == 200
}
于 2021-02-05T16:14:17.910 回答