2

我对 golang 比较陌生,我想创建一种方法来同时调用多个 URL,并解析 JSON 文档。但是,我真的不确定我是否正确使用了 goroutine 和 channels。在这一点上,我不确定我是否没有正确地“在 Go 中思考”,或者我对 goroutine 和通道的理解/方法是否不准确。

此外,在解析时,我想results从 body 中解析属性,这是一个数组,其中的每个元素都results包含一个doc我想过滤掉的属性。

目标是同时进行多次提取,并仅解析doc响应正文结果数组中的属性的响应。

绝对会感谢任何见解或建议以更好地理解事物。提前致谢。

package operations

import (
    "encoding/json"
    "fmt"
    "io/ioutil"
    "net/http"
    "os"
    "strings"
)

// CouchbaseDoc parses .doc property from sync gateway documents
type CouchbaseDoc struct {
    Doc map[string]string `json:"doc"`
}

// Results deconstruct... results is a property of body, and is an array of obects
type Results struct {
    Results []byte `json:"results"`
}

func createURLs(channels []string) map[string]string {
    urlMap := make(map[string]string)

    domain := "swap" + strings.TrimSpace(os.Getenv("env"))
    bucket := strings.TrimSpace(os.Getenv("bucket"))
    for _, doctype := range channels {
        urlMap[doctype] = fmt.Sprintf("https://%s.endpoint.com/%s/_changes?filter=sync_gateway/bychannel&channels=%s&include_docs=true", domain, bucket, doctype)
    }

    return urlMap
}

func getChangesFeed(url string, ch chan map[string]string) {
    resp, _ := http.Get(url)

    body, _ := ioutil.ReadAll(resp.Body)

    go parseBody(body, ch)
}

func parseBody(body []byte, ch chan map[string]string) {
    var results Results
    var doc CouchbaseDoc
    json.Unmarshal(body, &results)
    json.Unmarshal(results.Results, &doc)
    // write to responses
    ch <- doc.Doc
}

func fetchDocs(channels []string) {
    urls := createURLs(channels)

    // Response channel is where all go routines will do the dirty
    responses := make(chan map[string]string)
    for _, url := range urls {
        go getChangesFeed(url, responses)
    }

    // Read from responses channel
    docs := <-responses
    for doc := range docs {
        fmt.Println(doc) // This should print results ??
    }
}
4

1 回答 1

4

修复

这一行:

docs := <-responses

只会从通道接收一个元素,而不是所有元素。但是,您可以在通道上的每个预期发送调用一次接收操作,这将是您的代码的最简单修复:

responses := make(chan map[string]string)
for _, url := range urls {
    go getChangesFeed(url, responses)
}

for x := 0; x < len(urls); x++ {
    fmt.Println(<-responses)
}

更多信息

请注意,您使用的是无缓冲通道,因为您没有为通道指定长度。for e := range ch {循环仅适用于缓冲通道,并且仅在缓冲通道关闭之后。

关闭缓冲通道表示不会在通道上发送更多数据,并且可能与您的程序设计不匹配(尤其是没有sync.WaitGroup)。

所以使用缓冲通道很好:你只需要知道发送和接收操作都不会继续,除非双方都准备好了:这意味着每个都被阻塞并等待另一个。

这很容易通过上面的代码完成,方法是将发送放入 goroutine 中,并使用具有相等计数器的循环在主 goroutine 中将相等数量的接收操作排队。

要了解更多信息,请阅读同步包文档的语言规范有效 Go以及 Mutex 和 WaitGroup 部分。

可运行演示

这是一个完整的、可运行的示例来演示该原理:

package main

import(
    "fmt"
    "time"
)

func Sleep1(ch chan int) {
    time.Sleep(time.Second)
    ch <- 1
}

func Sleep3(ch chan int) {
    time.Sleep(time.Second * 3)
    ch <- 3
}

func Sleep5(ch chan int) {
    time.Sleep(time.Second * 5)
    ch <- 5
}

func main() {
    ch := make(chan int)
    go Sleep1(ch)
    go Sleep3(ch)
    go Sleep5(ch)
    for x := 0; x < 3; x++ {
        fmt.Println(<-ch)
    }
}
于 2018-07-07T01:59:51.900 回答