26

我正在参加 Go Tour,感觉除了并发之外我对该语言有很好的理解。

幻灯片 10是一个要求读者并行化网络爬虫的练习(并使其不包括重复,但我还没有到达那里。)

这是我到目前为止所拥有的:

func Crawl(url string, depth int, fetcher Fetcher, ch chan string) {
    if depth <= 0 {
        return
    }

    body, urls, err := fetcher.Fetch(url)
    if err != nil {
        ch <- fmt.Sprintln(err)
        return
    }

    ch <- fmt.Sprintf("found: %s %q\n", url, body)
    for _, u := range urls {
        go Crawl(u, depth-1, fetcher, ch)
    }
}

func main() {
    ch := make(chan string, 100)
    go Crawl("http://golang.org/", 4, fetcher, ch)

    for i := range ch {
        fmt.Println(i)
    }
}

我的问题是,我在哪里close(ch)打电话。

如果我defer close(ch)Crawl方法中的某个地方放了一个,那么程序最终会从一个生成的 goroutine 写入一个封闭的通道,因为调用Crawl将在生成的 goroutine 之前返回。

如果我省略了对 的调用close(ch),正如我所展示的那样,程序会在覆盖通道的主函数中死锁,因为当所有 goroutines 返回时,通道永远不会关闭。

4

21 回答 21

27

查看Effective Go的 Parallelization 部分可以找到解决方案的想法。本质上,您必须在函数的每个返回路径上关闭通道。实际上,这是 defer 语句的一个很好的用例:

func Crawl(url string, depth int, fetcher Fetcher, ret chan string) {
    defer close(ret)
    if depth <= 0 {
        return
    }

    body, urls, err := fetcher.Fetch(url)
    if err != nil {
        ret <- err.Error()
        return
    }

    ret <- fmt.Sprintf("found: %s %q", url, body)

    result := make([]chan string, len(urls))
    for i, u := range urls {
        result[i] = make(chan string)
        go Crawl(u, depth-1, fetcher, result[i])
    }

    for i := range result {
        for s := range result[i] {
            ret <- s
        }
    }

    return
}

func main() {
    result := make(chan string)
    go Crawl("http://golang.org/", 4, fetcher, result)

    for s := range result {
        fmt.Println(s)
    }
}

与您的代码的本质区别在于,每个 Crawl 实例都有自己的返回通道,调用函数在其返回通道中收集结果。

于 2012-11-04T22:57:42.893 回答
14

我与这个完全不同的方向。我可能被关于使用地图的提示误导了。

// SafeUrlMap is safe to use concurrently.
type SafeUrlMap struct {
    v   map[string]string
    mux sync.Mutex
}

func (c *SafeUrlMap) Set(key string, body string) {
    c.mux.Lock()
    // Lock so only one goroutine at a time can access the map c.v.
    c.v[key] = body
    c.mux.Unlock()
}

// Value returns mapped value for the given key.
func (c *SafeUrlMap) Value(key string) (string, bool) {
    c.mux.Lock()
    // Lock so only one goroutine at a time can access the map c.v.
    defer c.mux.Unlock()
    val, ok := c.v[key]
    return val, ok
}

// Crawl uses fetcher to recursively crawl
// pages starting with url, to a maximum of depth.
func Crawl(url string, depth int, fetcher Fetcher, urlMap SafeUrlMap) {
    defer wg.Done()
    urlMap.Set(url, body)

    if depth <= 0 {
        return
    }

    body, urls, err := fetcher.Fetch(url)
    if err != nil {
        fmt.Println(err)
        return
    }

    for _, u := range urls {
        if _, ok := urlMap.Value(u); !ok {
            wg.Add(1)
            go Crawl(u, depth-1, fetcher, urlMap)
        }
    }

    return
}

var wg sync.WaitGroup

func main() {
    urlMap := SafeUrlMap{v: make(map[string]string)}

    wg.Add(1)
    go Crawl("http://golang.org/", 4, fetcher, urlMap)
    wg.Wait()

    for url := range urlMap.v {
        body, _ := urlMap.Value(url)
        fmt.Printf("found: %s %q\n", url, body)
    }
}
于 2016-04-22T17:14:25.213 回答
8

O(1) 时间在地图上查找 url 而不是 O(n) 在所有访问的 url 切片上查找应该有助于最大限度地减少在关键部分内花费的时间,这对于本示例来说是微不足道的时间量,但会与规模相关.

WaitGroup 用于防止顶级 Crawl() 函数返回,直到所有子 go 例程完成。

func Crawl(url string, depth int, fetcher Fetcher) {
    var str_map = make(map[string]bool)
    var mux sync.Mutex
    var wg sync.WaitGroup

    var crawler func(string,int)
    crawler = func(url string, depth int) {
        defer wg.Done()

        if depth <= 0 {
            return
        }   

        mux.Lock()
        if _, ok := str_map[url]; ok {
            mux.Unlock()
            return;
        }else{
            str_map[url] = true
            mux.Unlock()
        }

        body, urls, err := fetcher.Fetch(url)
        if err != nil {
            fmt.Println(err)
            return
        }
        fmt.Printf("found: %s %q %q\n", url, body, urls)

        for _, u := range urls {
            wg.Add(1)
            go crawler(u, depth-1)          
        }       
    }
    wg.Add(1)
    crawler(url,depth)
    wg.Wait()   
}

func main() {
    Crawl("http://golang.org/", 4, fetcher)
}
于 2016-09-21T19:18:01.377 回答
5

与接受的答案类似的想法,但没有提取重复的 URL,并直接打印到控制台。defer() 也没有使用。当 goroutines 完成时,我们使用通道来发出信号。SafeMap 的想法是从前面介绍的 SafeCounter 中提出来的。

对于子 goroutine,我们创建一个通道数组,并通过在通道上等待直到每个孩子返回。

package main

import (
    "fmt"
    "sync"
)

// SafeMap is safe to use concurrently.
type SafeMap struct {
    v   map[string] bool
    mux sync.Mutex
}

// SetVal sets the value for the given key.
func (m *SafeMap) SetVal(key string, val bool) {
    m.mux.Lock()
    // Lock so only one goroutine at a time can access the map c.v.
    m.v[key] = val
    m.mux.Unlock()
}

// Value returns the current value of the counter for the given key.
func (m *SafeMap) GetVal(key string) bool {
    m.mux.Lock()
    // Lock so only one goroutine at a time can access the map c.v.
    defer m.mux.Unlock()
    return m.v[key]
}

type Fetcher interface {
    // Fetch returns the body of URL and
    // a slice of URLs found on that page.
    Fetch(url string) (body string, urls []string, err error)
}

// Crawl uses fetcher to recursively crawl
// pages starting with url, to a maximum of depth.
func Crawl(url string, depth int, fetcher Fetcher, status chan bool, urlMap SafeMap) {

    // Check if we fetched this url previously.
    if ok := urlMap.GetVal(url); ok {
        //fmt.Println("Already fetched url!")
        status <- true
        return 
    }

    // Marking this url as fetched already.
    urlMap.SetVal(url, true)

    if depth <= 0 {
        status <- false
        return
    }

    body, urls, err := fetcher.Fetch(url)
    if err != nil {
        fmt.Println(err)
        status <- false
        return
    }

    fmt.Printf("found: %s %q\n", url, body)

    statuses := make ([]chan bool, len(urls)) 
    for index, u := range urls {
        statuses[index] = make (chan bool)
        go Crawl(u, depth-1, fetcher, statuses[index], urlMap)
    }

    // Wait for child goroutines.
    for _, childstatus := range(statuses) {
        <- childstatus
    }

    // And now this goroutine can finish.
    status <- true

    return
}

func main() {
    urlMap := SafeMap{v: make(map[string] bool)}
    status := make(chan bool)
    go Crawl("https://golang.org/", 4, fetcher, status, urlMap)
    <- status
}

于 2019-03-09T05:17:36.067 回答
3

我认为使用映射(就像我们可以在其他语言中使用集合一样)和互斥锁是最简单的方法:

func Crawl(url string, depth int, fetcher Fetcher) {
    mux.Lock()
    defer mux.Unlock()
    if depth <= 0 || IsVisited(url) {
        return
    }
    visit[url] = true
    body, urls, err := fetcher.Fetch(url)
    if err != nil {
        fmt.Println(err)
        return
    }
    fmt.Printf("found: %s %q\n", url, body)
    for _, u := range urls {
        //
        go Crawl(u, depth-1, fetcher)
    }
    return
}

func IsVisited(s string) bool {
    _, ok := visit[s]
    return ok
}

var mux sync.Mutex

var visit = make(map[string]bool)

func main() {
    Crawl("https://golang.org/", 4, fetcher)
    time.Sleep(time.Second)
}
于 2019-07-21T16:41:01.683 回答
2

这是我的版本(受@fasmat的回答启发)——这个版本通过使用带有RWMutex的自定义缓存来防止两次获取相同的 URL 。

type Cache struct {
    data map[string]fakeResult
    mux sync.RWMutex
}

var cache = Cache{data: make(map[string]fakeResult)}

//cache adds new page to the global cache
func (c *Cache) cache(url string) fakeResult {
    c.mux.Lock()

    body, urls, err := fetcher.Fetch(url)
    if err != nil {
        body = err.Error()
    }

    data := fakeResult{body, urls}
    c.data[url] = data

    c.mux.Unlock()

    return data
}

//Visit visites the page at given url and caches it if needec
func (c *Cache) Visit(url string) (data fakeResult, alreadyCached bool) {
    c.mux.RLock()
    data, alreadyCached = c.data[url]
    c.mux.RUnlock()

    if !alreadyCached {
        data = c.cache(url)
    }

    return data, alreadyCached
}

/*
Crawl crawles all pages reachable from url and within the depth (given by args).
Fetches pages using given fetcher and caches them in the global cache.
Continously sends newly discovered pages to the out channel.
*/
func Crawl(url string, depth int, fetcher Fetcher, out chan string) {
    defer close(out)

    if depth <= 0 {
        return
    }

    data, alreadyCached := cache.Visit(url)
    if alreadyCached {
        return
    }

    //send newly discovered page to out channel
    out <- fmt.Sprintf("found: %s %q", url, data.body)

    //visit linked pages
    res := make([]chan string, len(data.urls))

    for i, link := range data.urls {
        res[i] = make(chan string)
        go Crawl(link, depth-1, fetcher, res[i])
    }

    //send newly discovered pages from links to out channel
    for i := range res {
        for s := range res[i] {
            out <- s
        }
    }
}

func main() {
    res := make(chan string)
    go Crawl("https://golang.org/", 4, fetcher, res)

    for page := range res {
        fmt.Println(page)   
    }
}

除了不获取 URL 两次之外,此解决方案不使用预先知道总页数的事实(适用于任意数量的页面),并且不会通过计时器错误地限制/延长程序执行时间。

于 2020-04-08T09:28:24.040 回答
1

这是我的解决方案。我有一个“主”例程,它侦听一个 url 通道并在找到要抓取的新 url 时启动新的爬取例程(将爬取的 url 放入通道中)。

我没有显式关闭通道,而是为未完成的爬取 goroutines 设置了一个计数器,当计数器为 0 时,程序退出,因为它没有什么可等待的。

func doCrawl(url string, fetcher Fetcher, results chan []string) {
    body, urls, err := fetcher.Fetch(url)
    results <- urls

    if err != nil {
        fmt.Println(err)
    } else {
        fmt.Printf("found: %s %q\n", url, body)
    }
}



func Crawl(url string, depth int, fetcher Fetcher) {
    results := make(chan []string)
    crawled := make(map[string]bool)
    go doCrawl(url, fetcher, results)
    // counter for unfinished crawling goroutines
    toWait := 1

    for urls := range results {
        toWait--

        for _, u := range urls {
            if !crawled[u] {
                crawled[u] = true
                go doCrawl(u, fetcher, results)
                toWait++
            }
        }

        if toWait == 0 {
            break
        }
    }
}
于 2014-11-15T01:44:42.883 回答
1

这是我的解决方案,使用 sync.WaitGroup 和获取的 URL 的 SafeCache:

package main

import (
    "fmt"
    "sync"
)

type Fetcher interface {
    // Fetch returns the body of URL and
    // a slice of URLs found on that page.
    Fetch(url string) (body string, urls []string, err error)
}

// Safe to use concurrently
type SafeCache struct {
    fetched map[string]string
    mux     sync.Mutex
}

func (c *SafeCache) Add(url, body string) {
    c.mux.Lock()
    defer c.mux.Unlock()

    if _, ok := c.fetched[url]; !ok {
        c.fetched[url] = body
    }
}

func (c *SafeCache) Contains(url string) bool {
    c.mux.Lock()
    defer c.mux.Unlock()

    _, ok := c.fetched[url]
    return ok
}

// Crawl uses fetcher to recursively crawl
// pages starting with url, to a maximum of depth.
func Crawl(url string, depth int, fetcher Fetcher, cache SafeCache,
    wg *sync.WaitGroup) {

    defer wg.Done()
    if depth <= 0 {
        return
    }
    body, urls, err := fetcher.Fetch(url)
    if err != nil {
        fmt.Println(err)
        return
    }
    fmt.Printf("found: %s %q\n", url, body)
    cache.Add(url, body)
    for _, u := range urls {
        if !cache.Contains(u) {
            wg.Add(1)
            go Crawl(u, depth-1, fetcher, cache, wg)
        }
    }
    return
}

func main() {
    cache := SafeCache{fetched: make(map[string]string)}
    var wg sync.WaitGroup

    wg.Add(1)
    Crawl("http://golang.org/", 4, fetcher, cache, &wg)
    wg.Wait()
}
于 2017-09-23T16:57:56.270 回答
1

下面是我的解决方案。除了全局地图,我只需要更改Crawl. 像其他解决方案一样,我使用sync.Mapsync.WaitGroup. 我已经屏蔽了重要的部分。

var m sync.Map

// Crawl uses fetcher to recursively crawl
// pages starting with url, to a maximum of depth.
func Crawl(url string, depth int, fetcher Fetcher) {
    // This implementation doesn't do either:
    if depth <= 0 {
        return
    }
    // Don't fetch the same URL twice.
    /////////////////////////////////////
    _, ok := m.LoadOrStore(url, url)   //
    if ok {                            //
        return                         //
    }                                  //
    /////////////////////////////////////
    body, urls, err := fetcher.Fetch(url)
    if err != nil {
        fmt.Println(err)
        return
    }
    fmt.Printf("found: %s %q\n", url, body)
    // Fetch URLs in parallel.
    /////////////////////////////////////
    var wg sync.WaitGroup              //
    defer wg.Wait()                    //
    for _, u := range urls {           //
        wg.Add(1)                      //
        go func(u string) {            //
            defer wg.Done()            //
            Crawl(u, depth-1, fetcher) //
        }(u)                           //
    }                                  //
    /////////////////////////////////////
    return
}
于 2019-05-12T23:36:54.563 回答
1

我将 safeCounter 和 waitGroup 传递给 Crawl 函数,然后使用 safeCounter 跳过已访问的 url,waitGroup 以防止提前退出当前 goroutine。

func Crawl(url string, depth int, fetcher Fetcher, c *SafeCounter, wg *sync.WaitGroup) {
    defer wg.Done()

    if depth <= 0 {
        return
    }

    c.mux.Lock()
    c.v[url]++
    c.mux.Unlock()

    body, urls, err := fetcher.Fetch(url)
    if err != nil {
        fmt.Println(err)
        return
    }
    fmt.Printf("found: %s %q\n", url, body)
    for _, u := range urls {
        c.mux.Lock()
        i := c.v[u]
        c.mux.Unlock()
        if i == 1 {
            continue
        }
        wg.Add(1)
        go Crawl(u, depth-1, fetcher, c, wg)
    }
    return
}

func main() {
    c := SafeCounter{v: make(map[string]int)}
    var wg sync.WaitGroup
    wg.Add(1)
    Crawl("https://golang.org/", 4, fetcher, &c, &wg)
    wg.Wait()
}
于 2020-03-31T10:57:42.007 回答
1
/*
Exercise: Web Crawler
In this exercise you'll use Go's concurrency features to parallelize a web crawler.

Modify the Crawl function to fetch URLs in parallel without fetching the same URL twice.

Hint: you can keep a cache of the URLs that have been fetched on a map, but maps alone are not safe for concurrent use!
*/

package main

import (
    "fmt"
    "sync"
    "time"
)

type Fetcher interface {
    // Fetch returns the body of URL and
    // a slice of URLs found on that page.
    Fetch(url string) (body string, urls []string, err error)
}

type Response struct {
    url  string
    urls []string
    body string
    err  error
}

var ch chan Response = make(chan Response)
var fetched map[string]bool = make(map[string]bool)
var wg sync.WaitGroup
var mu sync.Mutex

// Crawl uses fetcher to recursively crawl
// pages starting with url, to a maximum of depth.
func Crawl(url string, depth int, fetcher Fetcher) {
    // TODO: Fetch URLs in parallel.
    // TODO: Don't fetch the same URL twice.
    // This implementation doesn't do either:
    var fetch func(url string, depth int, fetcher Fetcher)
    wg.Add(1)
    recv := func() {
        for res := range ch {
            body, _, err := res.body, res.urls, res.err
            if err != nil {
                fmt.Println(err)
                continue
            }
            fmt.Printf("found: %s %q\n", url, body)
        }
    }

    fetch = func(url string, depth int, fetcher Fetcher) {
        time.Sleep(time.Second / 2)
        defer wg.Done()
        if depth <= 0 || fetched[url] {
            return
        }
        mu.Lock()
        fetched[url] = true
        mu.Unlock()
        body, urls, err := fetcher.Fetch(url)
        for _, u := range urls {
            wg.Add(1)
            go fetch(u, depth-1, fetcher)
        }
        ch <- Response{url, urls, body, err}
    }

    go fetch(url, depth, fetcher)
    go recv()
    return
}

func main() {
    Crawl("https://golang.org/", 4, fetcher)
    wg.Wait()
}

// fakeFetcher is Fetcher that returns canned results.
type fakeFetcher map[string]*fakeResult

type fakeResult struct {
    body string
    urls []string
}

func (f fakeFetcher) Fetch(url string) (string, []string, error) {
    if res, ok := f[url]; ok {
        return res.body, res.urls, nil
    }
    return "", nil, fmt.Errorf("not found: %s", url)
}

// fetcher is a populated fakeFetcher.
var fetcher = fakeFetcher{
    "https://golang.org/": &fakeResult{
        "The Go Programming Language",
        []string{
            "https://golang.org/pkg/",
            "https://golang.org/cmd1/",
        },
    },
    "https://golang.org/pkg/": &fakeResult{
        "Packages",
        []string{
            "https://golang.org/",
            "https://golang.org/cmd2/",
            "https://golang.org/pkg/fmt/",
            "https://golang.org/pkg/os/",
        },
    },
    "https://golang.org/pkg/fmt/": &fakeResult{
        "Package fmt",
        []string{
            "https://golang.org/",
            "https://golang.org/pkg/",
        },
    },
    "https://golang.org/pkg/os/": &fakeResult{
        "Package os",
        []string{
            "https://golang.org/",
            "https://golang.org/pkg/",
        },
    },
}

https://gist.github.com/gaogao1030/5d63ed925534f3610ccb7e25ed46992a

于 2020-07-27T06:17:24.960 回答
1

这是我的解决方案。我使用空结构作为安全缓存中的值,因为它们没有分配任何内存。我基于 os whossname的解决方案。

    package main
    
    import (
        "fmt"
        "sync"
    )
    
    type Fetcher interface {
        // Fetch returns the body of URL and
        // a slice of URLs found on that page.
        Fetch(url string) (body string, urls []string, err error)
    }
    
    type safeCache struct {
        m map[string]struct{}
        c sync.Mutex
    } 
    
    func (s *safeCache) Get(key string) bool {
        s.c.Lock()
        defer s.c.Unlock()
        if _,ok:=s.m[key];!ok{
            return false
        }
        return true
    }
    
    func (s *safeCache) Set(key string) {
        s.c.Lock()
        s.m[key] = struct{}{}
        s.c.Unlock()
        return
    }
    
    // Crawl uses fetcher to recursively crawl
    // pages starting with url, to a maximum of depth.
    func Crawl(url string, depth int, fetcher Fetcher, cach safeCache) {
        defer wg.Done()
        // TODO: Fetch URLs in parallel.
        // TODO: Don't fetch the same URL twice.
        // This implementation doesn't do either:
        cach.Set(url)
        if depth <= 0 {
            return
        }
        body, urls, err := fetcher.Fetch(url)
        if err != nil {
            fmt.Println(err)
            return
        }
        fmt.Printf("found: %s %q\n", url, body)
        for _, u := range urls {
            if found := cach.Get(u); !found{
                wg.Add(1)
                go Crawl(u, depth-1, fetcher, cach)
            }   
        }
        return
    }
    
    var wg sync.WaitGroup
    
    func main() {
        urlSafe := safeCache{m: make(map[string]struct{})}
        wg.Add(1)
        go Crawl("https://golang.org/", 4, fetcher, urlSafe)
        wg.Wait()
    }
    
    // fakeFetcher is Fetcher that returns canned results.
    type fakeFetcher map[string]*fakeResult
    
    type fakeResult struct {
        body string
        urls []string
    }
    
    func (f fakeFetcher) Fetch(url string) (string, []string, error) {
        if res, ok := f[url]; ok {
            return res.body, res.urls, nil
        }
        return "", nil, fmt.Errorf("not found: %s", url)
    }
    
    // fetcher is a populated fakeFetcher.
    var fetcher = fakeFetcher{
        "https://golang.org/": &fakeResult{
            "The Go Programming Language",
            []string{
                "https://golang.org/pkg/",
                "https://golang.org/cmd/",
            },
        },
        "https://golang.org/pkg/": &fakeResult{
            "Packages",
            []string{
                "https://golang.org/",
                "https://golang.org/cmd/",
                "https://golang.org/pkg/fmt/",
                "https://golang.org/pkg/os/",
            },
        },
        "https://golang.org/pkg/fmt/": &fakeResult{
            "Package fmt",
            []string{
                "https://golang.org/",
                "https://golang.org/pkg/",
            },
        },
        "https://golang.org/pkg/os/": &fakeResult{
            "Package os",
            []string{
                "https://golang.org/",
                "https://golang.org/pkg/",
            },
        },
    }

于 2021-08-23T10:32:59.047 回答
0

我用一个简单的通道实现了它,所有的 goroutine 都在其中发送消息。为了确保在没有更多 goroutines 时关闭它,我使用了一个安全计数器,当计数器为 0 时关闭通道。

type Msg struct {
    url string
    body string
}

type SafeCounter struct {
    v int
    mux sync.Mutex
}

func (c *SafeCounter) inc() {
    c.mux.Lock()
    defer c.mux.Unlock()
    c.v++   
}

func (c *SafeCounter) dec(ch chan Msg) {
    c.mux.Lock()
    defer c.mux.Unlock()
    c.v--
    if c.v == 0 {
        close(ch)
    }
}

var goes SafeCounter = SafeCounter{v: 0}

// Crawl uses fetcher to recursively crawl
// pages starting with url, to a maximum of depth.
func Crawl(url string, depth int, fetcher Fetcher, ch chan Msg) {
    defer goes.dec(ch)
    // TODO: Fetch URLs in parallel.
    // TODO: Don't fetch the same URL twice.
    // This implementation doesn't do either:
    if depth <= 0 {
        return
    }
    if !cache.existsAndRegister(url) {
        body, urls, err :=  fetcher.Fetch(url)
        if err != nil {
            fmt.Println(err)
            return
        }
        ch <- Msg{url, body}
        for _, u := range urls {
            goes.inc()
            go Crawl(u, depth-1, fetcher, ch)
        }
    }
    return
}

func main() {
    ch := make(chan Msg, 100)
    goes.inc()
    go Crawl("http://golang.org/", 4, fetcher, ch)
    for m := range ch {
        fmt.Printf("found: %s %q\n", m.url, m.body)
    }
}

请注意,安全计数器必须在 goroutine 之外递增。

于 2016-05-10T15:10:47.410 回答
0

我使用 slice 避免了两次抓取 url,没有并发的递归版本是可以的,但不确定这个并发版本。

func Crawl(url string, depth int, fetcher Fetcher) {
    var str_arrs []string
    var mux sync.Mutex

    var crawl func(string, int)
    crawl = func(url string, depth int) {
        if depth <= 0 {
            return
        }

        mux.Lock()
        for _, v := range str_arrs {
            if url == v {
                mux.Unlock()
                return
            }
        }
        str_arrs = append(str_arrs, url)
        mux.Unlock()

        body, urls, err := fetcher.Fetch(url)
        if err != nil {
            fmt.Println(err)
            return
        }
        fmt.Printf("found: %s %q\n", url, body)
        for _, u := range urls {
            go crawl(u, depth-1) // could delete “go” then it is recursive
        }
    }

    crawl(url, depth)
    return
}

func main() {
    Crawl("http://golang.org/", 4, fetcher)
}
于 2016-09-10T07:13:39.587 回答
0

下面是一个仅使用同步 waitGroup 的简单并行化解决方案。

var fetchedUrlMap = make(map[string]bool)
var mutex sync.Mutex

func Crawl(url string, depth int, fetcher Fetcher) {
	//fmt.Println("In Crawl2 with url" , url)
	if _, ok := fetchedUrlMap[url]; ok {
		return
	}

	if depth <= 0 {
		return
	}
	body, urls, err := fetcher.Fetch(url)
	mutex.Lock()
	fetchedUrlMap[url] = true
	mutex.Unlock()
	if err != nil {
		fmt.Println(err)
		return
	}
	fmt.Printf("found: %s %q\n", url, body)

	var wg sync.WaitGroup
	for _, u := range urls {
		//	fmt.Println("Solving for ", u)
		wg.Add(1)
		go func(uv string) {
			Crawl(uv, depth-1, fetcher)
			wg.Done()
		}(u)
	}
	wg.Wait()
}

于 2019-02-14T14:30:50.033 回答
0

这是我的解决方案:)

package main

import (
    "fmt"
    "runtime"
    "sync"
)

type Fetcher interface {
    // Fetch returns the body of URL and
    // a slice of URLs found on that page.
    Fetch(url string) (body string, urls []string, err error)
}

// Crawl uses fetcher to recursively crawl
// pages starting with url, to a maximum of depth.
func Crawl(url string, depth int, fetcher Fetcher, set map[string]bool) {
    // TODO: Fetch URLs in parallel.
    // TODO: Don't fetch the same URL twice.
    // This implementation doesn't do either:
    if depth <= 0 {
        return
    }
    // use a set to identify if the URL should be traversed or not
    if set[url] == true {
        wg.Done()
        return
    } else {
        fmt.Println(runtime.NumGoroutine())
        set[url] = true
        body, urls, err := fetcher.Fetch(url)
        if err != nil {
            fmt.Println(err)
            return
        }
        fmt.Printf("found: %s %q\n", url, body)
        for _, u := range urls {
            Crawl(u, depth-1, fetcher, set)
        }

    }

}

var wg sync.WaitGroup

func main() {
    wg.Add(6)
    collectedURLs := make(map[string]bool)
    go Crawl("https://golang.org/", 4, fetcher, collectedURLs)
    wg.Wait()
}

// fakeFetcher is Fetcher that returns canned results.
type fakeFetcher map[string]*fakeResult

type fakeResult struct {
    body string
    urls []string
}

func (f fakeFetcher) Fetch(url string) (string, []string, error) {
    if res, ok := f[url]; ok {
        return res.body, res.urls, nil
    }
    return "", nil, fmt.Errorf("not found: %s", url)
}

// fetcher is a populated fakeFetcher.
var fetcher = fakeFetcher{
    "https://golang.org/": &fakeResult{
        "The Go Programming Language",
        []string{
            "https://golang.org/pkg/",
            "https://golang.org/cmd/",
        },
    },
    "https://golang.org/pkg/": &fakeResult{
        "Packages",
        []string{
            "https://golang.org/",
            "https://golang.org/cmd/",
            "https://golang.org/pkg/fmt/",
            "https://golang.org/pkg/os/",
        },
    },
    "https://golang.org/pkg/fmt/": &fakeResult{
        "Package fmt",
        []string{
            "https://golang.org/",
            "https://golang.org/pkg/",
        },
    },
    "https://golang.org/pkg/os/": &fakeResult{
        "Package os",
        []string{
            "https://golang.org/",
            "https://golang.org/pkg/",
        },
    },
}
于 2020-02-16T18:29:29.893 回答
0

由于这里的大多数解决方案都不适合我(包括接受的答案),我将上传我自己的灵感来自卡米尔(特别感谢 :)(没有重复/所有有效的 URL)

package main

import (
    "fmt"
    "runtime"
    "sync"
)

type Fetcher interface {
    // Fetch returns the body of URL and
    // a slice of URLs found on that page.
    Fetch(url string) (body string, urls []string, err error)
}

// Crawl uses fetcher to recursively crawl
// pages starting with url, to a maximum of depth.
func Crawl(url string, depth int, fetcher Fetcher, set map[string]bool) {
    // TODO: Fetch URLs in parallel.
    // TODO: Don't fetch the same URL twice.
    if depth <= 0 { return }
    // use a set to identify if the URL should be traversed or not
    fmt.Println(runtime.NumGoroutine())
    set[url] = true
    body, urls, err := fetcher.Fetch(url)
    if err != nil {
        fmt.Println(err)
        return
    }
    fmt.Printf("found: %s %q\n", url, body)
    for _, u := range urls {
        if set[u] == false {
            wg.Add(1)
            go Crawl(u, depth-1, fetcher, set)
        }
    }
    wg.Done()
}

var wg sync.WaitGroup

func main() {
    collectedURLs := make(map[string]bool)
    Crawl("https://golang.org/", 4, fetcher, collectedURLs)
    wg.Wait()
}
于 2020-07-06T18:07:59.070 回答
0

这是我的解决方案。我有一个问题,主函数没有等待 goroutine 打印它们的状态并完成。我检查了上一张幻灯片是否使用了在退出前等待 1 秒的解决方案,因此我决定使用该方法。但在实践中,我相信一些协调机制会更好。

import (
    "fmt"
    "sync"
    "time"
)

type SafeMap struct {
    mu sync.Mutex
    v  map[string]bool
}

// Sets the given key to true.
func (sm *SafeMap) Set(key string) {
    sm.mu.Lock()
    sm.v[key] = true
    sm.mu.Unlock()
}

// Get returns the current value for the given key.
func (sm *SafeMap) Get(key string) bool {
    sm.mu.Lock()
    defer sm.mu.Unlock()
    return sm.v[key]
}

var safeMap = SafeMap{v: make(map[string]bool)}

type Fetcher interface {
    // Fetch returns the body of URL and
    // a slice of URLs found on that page.
    Fetch(url string) (body string, urls []string, err error)
}

// Crawl uses fetcher to recursively crawl
// pages starting with url, to a maximum of depth.
func Crawl(url string, depth int, fetcher Fetcher) {
    if depth <= 0 {
        return
    }
    
    // if the value exists, don't fetch it twice
    if safeMap.Get(url) {
        return  
    }
    
    // check if there is an error fetching
    body, urls, err := fetcher.Fetch(url)
    safeMap.Set(url)
    if err != nil {
        fmt.Println(err)
        return
    }
    
    // list contents and crawl recursively
    fmt.Printf("found: %s %q\n", url, body)
    for _, u := range urls {
        go Crawl(u, depth-1, fetcher)
    }
}

func main() {
    go Crawl("https://golang.org/", 4, fetcher)
    time.Sleep(time.Second)
}
于 2020-11-02T12:51:13.030 回答
0

无需更改任何签名或在全局范围内引入任何新内容。我们可以使用 async.WaitGroup来等待recursegoroutine 完成。从字符串到空结构的Amap充当一个集合,是保持已抓取 URL 计数的最有效方法。

func Crawl(url string, depth int, fetcher Fetcher) {
    visited := make(map[string]struct{})
    var mu sync.Mutex
    var wg sync.WaitGroup

    var recurse func(string, int)
    recurse = func(url string, depth int) {
        defer wg.Done()

        if depth <= 0 {
            return
        }

        mu.Lock()
        defer mu.Unlock()

        if _, ok := visited[url]; ok {
            return
        }
        visited[url] = struct{}{}

        body, urls, err := fetcher.Fetch(url)
        if err != nil {
            fmt.Println(err)
            return
        }
        fmt.Printf("found: %s %q\n", url, body)

        for _, u := range urls {
            wg.Add(1)
            go recurse(u, depth-1)
        }
    }

    wg.Add(1)
    go recurse(url, depth)
    wg.Wait()
}

func main() {
    Crawl("https://golang.org/", 4, fetcher)
}

Go Playground上的完整演示

于 2021-09-28T20:19:24.060 回答
0

使用互斥锁和通道

package main

import (
    "fmt"
    "sync"
)

type SafeMap struct {
    mu sync.Mutex
    seen map[string]bool
}

func (s *SafeMap) getVal(url string) bool {
    s.mu.Lock()
    defer s.mu.Unlock()
    return s.seen[url]
}

func (s *SafeMap) setVal(url string) {
    s.mu.Lock()
    defer s.mu.Unlock()
    s.seen[url] = true
}

var s = SafeMap{seen: make(map[string]bool)}

type Fetcher interface {
    // Fetch returns the body of URL and
    // a slice of URLs found on that page.
    Fetch(url string) (body string, urls []string, err error)
}

// Crawl uses fetcher to recursively crawl
// pages starting with url, to a maximum of depth.
func Crawl(url string, depth int, fetcher Fetcher, ch chan bool) {
    // TODO: Fetch URLs in parallel.
    // TODO: Don't fetch the same URL twice.
    // This implementation doesn't do either:
    if depth <= 0 || s.getVal(url) {
        ch <- false
        return
    }
    body, urls, err := fetcher.Fetch(url)
    
    s.setVal(url)
    
    if err != nil {
        fmt.Println(err)
        ch <- false
        return
    }
    fmt.Printf("found: %s %q\n", url, body)
    chs := make(map[string]chan bool, len(urls))
    for _, u := range urls {
        chs[u] = make(chan bool)
        go Crawl(u, depth-1, fetcher, chs[u])
    }
    for _,v := range urls {
        <-chs[v]
    }
    ch <- true
    return
}

func main() {
    ch := make(chan bool)
    go Crawl("https://golang.org/", 4, fetcher, ch)
    <-ch
}

// fakeFetcher is Fetcher that returns canned results.
type fakeFetcher map[string]*fakeResult

type fakeResult struct {
    body string
    urls []string
}

func (f fakeFetcher) Fetch(url string) (string, []string, error) {
    if res, ok := f[url]; ok {
        return res.body, res.urls, nil
    }
    return "", nil, fmt.Errorf("not found: %s", url)
}

// fetcher is a populated fakeFetcher.
var fetcher = fakeFetcher{
    "https://golang.org/": &fakeResult{
        "The Go Programming Language",
        []string{
            "https://golang.org/pkg/",
            "https://golang.org/cmd/",
        },
    },
    "https://golang.org/pkg/": &fakeResult{
        "Packages",
        []string{
            "https://golang.org/",
            "https://golang.org/cmd/",
            "https://golang.org/pkg/fmt/",
            "https://golang.org/pkg/os/",
        },
    },
    "https://golang.org/pkg/fmt/": &fakeResult{
        "Package fmt",
        []string{
            "https://golang.org/",
            "https://golang.org/pkg/",
        },
    },
    "https://golang.org/pkg/os/": &fakeResult{
        "Package os",
        []string{
            "https://golang.org/",
            "https://golang.org/pkg/",
        },
    },
}
于 2021-12-12T21:10:46.993 回答
-1

我是新来的,所以一粒盐,但在我看来,这个解决方案似乎更惯用。它对所有结果使用单个通道,对所有抓取请求(尝试抓取特定 url)使用单个通道,以及用于跟踪完成的等待组。主 Crawl 调用充当对工作进程的抓取请求的分发者(同时处理重复数据删除),并充当有多少抓取请求待处理的跟踪器。

package main

import (
    "fmt"
    "sync"
)

type Fetcher interface {
    // Fetch returns the body of URL and
    // a slice of URLs found on that page.
    Fetch(url string) (body string, urls []string, err error)
}

type FetchResult struct {
    url  string
    body string
    err  error
}

type CrawlRequest struct {
    url   string
    depth int
}

type Crawler struct {
    depth           int
    fetcher         Fetcher
    results         chan FetchResult
    crawlRequests   chan CrawlRequest
    urlReservations map[string]bool
    waitGroup       *sync.WaitGroup
}

func (crawler Crawler) Crawl(url string, depth int) {
    defer crawler.waitGroup.Done()

    if depth <= 0 {
        return
    }

    body, urls, err := crawler.fetcher.Fetch(url)
    crawler.results <- FetchResult{url, body, err}

    if len(urls) == 0 {
        return
    }

    crawler.waitGroup.Add(len(urls))
    for _, url := range urls {
        crawler.crawlRequests <- CrawlRequest{url, depth - 1}
    }
}

// Crawl uses fetcher to recursively crawl
// pages starting with url, to a maximum of depth.
func Crawl(url string, depth int, fetcher Fetcher) (results chan FetchResult) {
    results = make(chan FetchResult)
    urlReservations := make(map[string]bool)
    crawler := Crawler{
        crawlRequests: make(chan CrawlRequest),
        depth:         depth,
        fetcher:       fetcher,
        results:       results,
        waitGroup:     &sync.WaitGroup{},
    }

    crawler.waitGroup.Add(1)

    // Listen for crawlRequests, pass them through to the caller if they aren't duplicates.
    go func() {
        for crawlRequest := range crawler.crawlRequests {
            if _, isReserved := urlReservations[crawlRequest.url]; isReserved {
                crawler.waitGroup.Done()
                continue
            }
            urlReservations[crawlRequest.url] = true
            go crawler.Crawl(crawlRequest.url, crawlRequest.depth)
        }
    }()

    // Wait for the wait group to finish, and then close the channel
    go func() {
        crawler.waitGroup.Wait()
        close(results)
    }()

    // Send the first crawl request to the channel
    crawler.crawlRequests <- CrawlRequest{url, depth}

    return
}

func main() {
    results := Crawl("https://golang.org/", 4, fetcher)
    for result := range results {
        if result.err != nil {
            fmt.Println(result.err)
            continue
        }
        fmt.Printf("found: %s %q\n", result.url, result.body)
    }

    fmt.Printf("done!")
}

// fakeFetcher is Fetcher that returns canned results.
type fakeFetcher map[string]*fakeResult

type fakeResult struct {
    body string
    urls []string
}

func (f fakeFetcher) Fetch(url string) (string, []string, error) {
    if res, ok := f[url]; ok {
        return res.body, res.urls, nil
    }
    return "", nil, fmt.Errorf("not found: %s", url)
}

// fetcher is a populated fakeFetcher.
var fetcher = fakeFetcher{
    "https://golang.org/": &fakeResult{
        "The Go Programming Language",
        []string{
            "https://golang.org/pkg/",
            "https://golang.org/cmd/",
        },
    },
    "https://golang.org/pkg/": &fakeResult{
        "Packages",
        []string{
            "https://golang.org/",
            "https://golang.org/cmd/",
            "https://golang.org/pkg/fmt/",
            "https://golang.org/pkg/os/",
        },
    },
    "https://golang.org/pkg/fmt/": &fakeResult{
        "Package fmt",
        []string{
            "https://golang.org/",
            "https://golang.org/pkg/",
        },
    },
    "https://golang.org/pkg/os/": &fakeResult{
        "Package os",
        []string{
            "https://golang.org/",
            "https://golang.org/pkg/",
        },
    },
}
于 2020-08-27T07:29:02.200 回答