1

我需要创建一个将数据传递给多个消费者的队列。我可以使用缓冲通道和上下文吗?而且我不确定这是否是线程安全的

这是我正在谈论的示例代码:

package main

import (
    "context"
    "fmt"
    "strconv"
    "time"
)

func main() {
    runQueue()
}

func runQueue() {
    // When the buffer is full
    // sending channel is blocked
    queue := make(chan string, 10000)

    // If there are too few consumer,
    // the channel buffer will be full, and the sending channel will be blocked.
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()
    consumerCount := 5
    go runProducer(queue, ctx, cancel)
    for i := 0; i < consumerCount; i++ {
        go runConsumer(queue, ctx)
    }
    select {
    case <-ctx.Done():
        // close channel to let goroutine get ctx.Done()
        close(queue)
    }
}

func runConsumer(queue chan string, ctx context.Context) {
    for {
        data := <-queue
        select {
        case <-ctx.Done():
            return
        default:

        }
        fmt.Println(data)
        <-time.After(time.Millisecond * 1000)
    }
}

func runProducer(queue chan string, ctx context.Context, cancel context.CancelFunc) {
    for {
        fmt.Println("get data from server")
        select {
        case <-ctx.Done():
            return
        default:

        }
        // dataList will be filled from other server
        dataList, err := getSomethingFromServer()
        if err != nil {
            if err.Error() == "very fatal error" {
                cancel()
                return
            }
            fmt.Println(err)
            continue
        }
        select {
        case <-ctx.Done():
            return
        default:

        }
        for _, el := range dataList {
            queue <- el
        }
        <-time.After(time.Millisecond * 2000)
    }
}

func getSomethingFromServer() ([]string, error) {
    var newList []string
    for i := 1; i < 4; i++ {
        newList = append(newList, strconv.Itoa(i))
    }
    return newList, nil
}

它是线程安全的吗?我的逻辑进展顺利吗?

如果有任何错误,我希望收到反馈

如果有更好的做法,请告诉我。

4

1 回答 1

1
  1. 上下文是线程安全的。https://go.dev/blog/context

一个 Context 对于多个 goroutine 同时使用是安全的。代码可以将单个 Context 传递给任意数量的 goroutine,并取消该 Context 以向所有 goroutine 发出信号。

所以在多个goroutines安全的go领域中〜线程安全的,因为你永远不知道goroutines在哪些线程(相同/不同)上运行

  1. 通道是线程安全的 - https://go.dev/ref/spec#Channel_types

单个通道可用于发送语句、接收操作以及由任意数量的 goroutine 调用内置函数 cap 和 len,而无需进一步同步

通道在后台使用互斥锁 https://github.com/golang/go/blob/master/src/runtime/chan.go#L51

  1. 对于并发模式,请查看非常好的 Go 博客文章:
于 2021-12-22T05:08:50.863 回答