64

为高并发应用程序实现全局计数器的最佳方法是什么?就我而言,我可能有 10K-20K 的例程执行“工作”,我想计算例程共同处理的项目的数量和类型......

“经典”同步编码风格如下所示:

var work_counter int

func GoWorkerRoutine() {
    for {
        // do work
        atomic.AddInt32(&work_counter,1)
    }    
}

现在这变得更加复杂,因为我想跟踪正在完成的工作的“类型”,所以我真的需要这样的东西:

var work_counter map[string]int
var work_mux sync.Mutex

func GoWorkerRoutine() {
    for {
        // do work
        work_mux.Lock()
        work_counter["type1"]++
        work_mux.Unlock()
    }    
}

似乎应该有一种使用通道或类似的“go”优化方式:

var work_counter int
var work_chan chan int // make() called somewhere else (buffered)

// started somewher else
func GoCounterRoutine() {
    for {
        select {
            case c := <- work_chan:
                work_counter += c
                break
        }
    }
}

func GoWorkerRoutine() {
    for {
        // do work
        work_chan <- 1
    }    
}

最后一个示例仍然缺少地图,但这很容易添加。这种风格会比简单的原子增量提供更好的性能吗?当我们谈论对全局值的并发访问与可能阻止 I/O 完成的事情时,我无法判断这是否或多或少复杂......

想法受到赞赏。

2013 年 5 月 28 日更新:

我测试了几个实现,结果不是我预期的,这是我的计数器源代码:

package helpers

import (
)

type CounterIncrementStruct struct {
    bucket string
    value int
}

type CounterQueryStruct struct {
    bucket string
    channel chan int
}

var counter map[string]int
var counterIncrementChan chan CounterIncrementStruct
var counterQueryChan chan CounterQueryStruct
var counterListChan chan chan map[string]int

func CounterInitialize() {
    counter = make(map[string]int)
    counterIncrementChan = make(chan CounterIncrementStruct,0)
    counterQueryChan = make(chan CounterQueryStruct,100)
    counterListChan = make(chan chan map[string]int,100)
    go goCounterWriter()
}

func goCounterWriter() {
    for {
        select {
            case ci := <- counterIncrementChan:
                if len(ci.bucket)==0 { return }
                counter[ci.bucket]+=ci.value
                break
            case cq := <- counterQueryChan:
                val,found:=counter[cq.bucket]
                if found {
                    cq.channel <- val
                } else {
                    cq.channel <- -1    
                }
                break
            case cl := <- counterListChan:
                nm := make(map[string]int)
                for k, v := range counter {
                    nm[k] = v
                }
                cl <- nm
                break
        }
    }
}

func CounterIncrement(bucket string, counter int) {
    if len(bucket)==0 || counter==0 { return }
    counterIncrementChan <- CounterIncrementStruct{bucket,counter}
}

func CounterQuery(bucket string) int {
    if len(bucket)==0 { return -1 }
    reply := make(chan int)
    counterQueryChan <- CounterQueryStruct{bucket,reply}
    return <- reply
}

func CounterList() map[string]int {
    reply := make(chan map[string]int)
    counterListChan <- reply
    return <- reply
}

它使用通道进行写入和读取,这似乎是合乎逻辑的。

这是我的测试用例:

func bcRoutine(b *testing.B,e chan bool) {
    for i := 0; i < b.N; i++ {
        CounterIncrement("abc123",5)
        CounterIncrement("def456",5)
        CounterIncrement("ghi789",5)
        CounterIncrement("abc123",5)
        CounterIncrement("def456",5)
        CounterIncrement("ghi789",5)
    }
    e<-true
}

func BenchmarkChannels(b *testing.B) {
    b.StopTimer()
    CounterInitialize()
    e:=make(chan bool)
    b.StartTimer()

    go bcRoutine(b,e)
    go bcRoutine(b,e)
    go bcRoutine(b,e)
    go bcRoutine(b,e)
    go bcRoutine(b,e)

    <-e
    <-e
    <-e
    <-e
    <-e

}

var mux sync.Mutex
var m map[string]int
func bmIncrement(bucket string, value int) {
    mux.Lock()
    m[bucket]+=value
    mux.Unlock()
}

func bmRoutine(b *testing.B,e chan bool) {
    for i := 0; i < b.N; i++ {
        bmIncrement("abc123",5)
        bmIncrement("def456",5)
        bmIncrement("ghi789",5)
        bmIncrement("abc123",5)
        bmIncrement("def456",5)
        bmIncrement("ghi789",5)
    }
    e<-true
}

func BenchmarkMutex(b *testing.B) {
    b.StopTimer()
    m=make(map[string]int)
    e:=make(chan bool)
    b.StartTimer()

    for i := 0; i < b.N; i++ {
        bmIncrement("abc123",5)
        bmIncrement("def456",5)
        bmIncrement("ghi789",5)
        bmIncrement("abc123",5)
        bmIncrement("def456",5)
        bmIncrement("ghi789",5)
    }

    go bmRoutine(b,e)
    go bmRoutine(b,e)
    go bmRoutine(b,e)
    go bmRoutine(b,e)
    go bmRoutine(b,e)

    <-e
    <-e
    <-e
    <-e
    <-e

}

我实现了一个简单的基准测试,在地图周围只有一个互斥锁(只是测试写入),并用 5 个并行运行的 goroutine 对两者进行了基准测试。结果如下:

$ go test --bench=. helpers
PASS
BenchmarkChannels         100000             15560 ns/op
BenchmarkMutex   1000000              2669 ns/op
ok      helpers 4.452s

我没想到互斥锁会这么快......

进一步的想法?

4

9 回答 9

32

如果您尝试同步一组工作人员(例如,允许 n 个 goroutines 处理一些工作),那么通道是一种非常好的方法,但如果您真正需要的是一个计数器(例如页面浏览量) 那么他们就大材小用了。同步同步/原子包可以提供帮助。

import "sync/atomic"

type count32 int32

func (c *count32) inc() int32 {
    return atomic.AddInt32((*int32)(c), 1)
}

func (c *count32) get() int32 {
    return atomic.LoadInt32((*int32)(c))
}

去游乐场示例

于 2014-02-26T07:54:30.857 回答
24

不要使用同步/原子- 来自链接页面

包 atomic 提供了用于实现同步算法的低级原子内存原语。这些功能需要非常小心才能正确使用。除了特殊的低级应用程序外,最好使用通道或同步包的工具来完成同步

上次我不得不这样做时,我对一些看起来像你的第二个带有互斥锁的示例和看起来像你的第三个带有通道的示例的东西进行了基准测试。当事情变得非常繁忙时,通道代码会获胜,但请确保您使通道缓冲区变大。

于 2013-05-28T16:51:58.890 回答
11

不要仅仅因为您认为它们“不适合 Go”而害怕使用互斥锁和锁。在您的第二个示例中,绝对清楚发生了什么,这很重要。您必须自己尝试一下,看看该互斥锁有多满意,以及添加复杂性是否会提高性能。

如果您确实需要提高性能,也许分片是最好的选择: http ://play.golang.org/p/uLirjskGeN

缺点是您的计数只会在您的分片决定时保持最新。调用这么多也可能会影响性能time.Since(),但一如既往,首先测量它:)

于 2013-05-28T16:06:24.817 回答
7

使用同步/原子的另一个答案适用于页面计数器之类的东西,但不适用于向外部 API 提交唯一标识符。为此,您需要一个“增量和返回”操作,它只能作为 CAS 循环来实现。

这是一个围绕 int32 生成唯一消息 ID 的 CAS 循环:

import "sync/atomic"

type UniqueID struct {
    counter int32
}

func (c *UniqueID) Get() int32 {
    for {
        val := atomic.LoadInt32(&c.counter)
        if atomic.CompareAndSwapInt32(&c.counter, val, val+1) {
            return val
        }
    }
}

要使用它,只需执行以下操作:

requestID := client.msgID.Get()
form.Set("id", requestID)

与通道相比,它的优势在于它不需要太多额外的空闲资源——现有的 goroutine 会在它们请求 ID 时被使用,而不是为程序需要的每个计数器使用一个 goroutine。

TODO:针对渠道进行基准测试。我猜想频道在无争用情况下更糟,在高争用情况下更好,因为它们有排队,而这段代码只是为了赢得比赛而旋转。

于 2016-10-31T23:00:31.380 回答
6

老问题,但我只是偶然发现了这个,它可能会有所帮助:https ://github.com/uber-go/atomic

基本上,Uber 的工程师在sync/atomic包的基础上构建了一些不错的 util 函数

我还没有在生产中测试过,但是代码库非常小,大多数功能的实现都是标准的

绝对优于使用通道或基本互斥锁

于 2018-09-18T01:07:48.657 回答
3

最后一个很接近:

package main

import "fmt"

func main() {
    ch := make(chan int, 3)
    go GoCounterRoutine(ch)
    go GoWorkerRoutine(1, ch)
    // not run as goroutine because mein() would just end
    GoWorkerRoutine(2, ch)

}

// started somewhere else
func GoCounterRoutine(ch chan int) {
    counter := 0
    for {
        ch <- counter
        counter += 1
    }
}

func GoWorkerRoutine(n int, ch chan int) {
    var seq int
    for seq := range ch {
        // do work:
        fmt.Println(n, seq)
    }
}

这引入了单点故障:如果计数器 goroutine 死了,一切都将丢失。如果所有 goroutine 都在一台计算机上执行,这可能不是问题,但如果它们分散在网络上,则可能会成为问题。为了使计数器免受集群中单个节点故障的影响,必须使用特殊算法。

于 2013-05-28T03:29:05.237 回答
3

我用一个简单的 map + mutex 实现了这个,这似乎是处理这个问题的最佳方式,因为它是“最简单的方式”(这是 Go 所说的用于选择锁与通道的方法)。

package main

import (
    "fmt"
    "sync"
)

type single struct {
    mu     sync.Mutex
    values map[string]int64
}

var counters = single{
    values: make(map[string]int64),
}

func (s *single) Get(key string) int64 {
    s.mu.Lock()
    defer s.mu.Unlock()
    return s.values[key]
}

func (s *single) Incr(key string) int64 {
    s.mu.Lock()
    defer s.mu.Unlock()
    s.values[key]++
    return s.values[key]
}

func main() {
    fmt.Println(counters.Incr("bar"))
    fmt.Println(counters.Incr("bar"))
    fmt.Println(counters.Incr("bar"))
    fmt.Println(counters.Get("foo"))
    fmt.Println(counters.Get("bar"))

}

您可以在https://play.golang.org/p/9bDMDLFBAY上运行代码。我在 gist.github.com 上做了一个简单的打包版本

于 2016-11-07T16:58:29.380 回答
2

自己看看,让我知道你的想法。

src/test/helpers/helpers.go

package helpers

type CounterIncrementStruct struct {
    bucket string
    value  int
}

type CounterQueryStruct struct {
    bucket  string
    channel chan int
}

var counter map[string]int
var counterIncrementChan chan CounterIncrementStruct
var counterQueryChan chan CounterQueryStruct
var counterListChan chan chan map[string]int

func CounterInitialize() {
    counter = make(map[string]int)
    counterIncrementChan = make(chan CounterIncrementStruct, 0)
    counterQueryChan = make(chan CounterQueryStruct, 100)
    counterListChan = make(chan chan map[string]int, 100)
    go goCounterWriter()
}

func goCounterWriter() {
    for {
        select {
        case ci := <-counterIncrementChan:
            if len(ci.bucket) == 0 {
                return
            }
            counter[ci.bucket] += ci.value
            break
        case cq := <-counterQueryChan:
            val, found := counter[cq.bucket]
            if found {
                cq.channel <- val
            } else {
                cq.channel <- -1
            }
            break
        case cl := <-counterListChan:
            nm := make(map[string]int)
            for k, v := range counter {
                nm[k] = v
            }
            cl <- nm
            break
        }
    }
}

func CounterIncrement(bucket string, counter int) {
    if len(bucket) == 0 || counter == 0 {
        return
    }
    counterIncrementChan <- CounterIncrementStruct{bucket, counter}
}

func CounterQuery(bucket string) int {
    if len(bucket) == 0 {
        return -1
    }
    reply := make(chan int)
    counterQueryChan <- CounterQueryStruct{bucket, reply}
    return <-reply
}

func CounterList() map[string]int {
    reply := make(chan map[string]int)
    counterListChan <- reply
    return <-reply
}

src/test/distributed/distributed.go

package distributed

type Counter struct {
    buckets map[string]int
    incrQ   chan incrQ
    readQ   chan readQ
    sumQ    chan chan int
}

func New() Counter {
    c := Counter{
        buckets: make(map[string]int, 100),
        incrQ:   make(chan incrQ, 1000),
        readQ:   make(chan readQ, 0),
        sumQ:    make(chan chan int, 0),
    }
    go c.run()
    return c
}

func (c Counter) run() {
    for {
        select {
        case a := <-c.readQ:
            a.res <- c.buckets[a.bucket]
        case a := <-c.sumQ:
            var sum int
            for _, cnt := range c.buckets {
                sum += cnt
            }
            a <- sum
        case a := <-c.incrQ:
            c.buckets[a.bucket] += a.count
        }
    }
}

func (c Counter) Get(bucket string) int {
    res := make(chan int)
    c.readQ <- readQ{bucket: bucket, res: res}
    return <-res
}

func (c Counter) Sum() int {
    res := make(chan int)
    c.sumQ <- res
    return <-res
}

type readQ struct {
    bucket string
    res    chan int
}

type incrQ struct {
    bucket string
    count  int
}

func (c Counter) Agent(bucket string, limit int) *Agent {
    a := &Agent{
        bucket:   bucket,
        limit:    limit,
        sendIncr: c.incrQ,
    }
    return a
}

type Agent struct {
    bucket   string
    limit    int
    count    int
    sendIncr chan incrQ
}

func (a *Agent) Incr(n int) {
    a.count += n
    if a.count > a.limit {
        select {
        case a.sendIncr <- incrQ{bucket: a.bucket, count: a.count}:
            a.count = 0
        default:
        }
    }
}

func (a *Agent) Done() {
    a.sendIncr <- incrQ{bucket: a.bucket, count: a.count}
    a.count = 0
}

src/test/helpers_test.go

package counters

import (
    "sync"
    "testing"
)

var mux sync.Mutex
var m map[string]int

func bmIncrement(bucket string, value int) {
    mux.Lock()
    m[bucket] += value
    mux.Unlock()
}

func BenchmarkMutex(b *testing.B) {
    b.StopTimer()
    m = make(map[string]int)
    buckets := []string{
        "abc123",
        "def456",
        "ghi789",
    }
    b.StartTimer()

    var wg sync.WaitGroup
    wg.Add(b.N)
    for i := 0; i < b.N; i++ {
        go func() {
            for _, b := range buckets {
                bmIncrement(b, 5)
            }
            for _, b := range buckets {
                bmIncrement(b, 5)
            }
            wg.Done()
        }()
    }
    wg.Wait()
}

src/test/distributed_test.go

package counters

import (
    "sync"
    "test/counters/distributed"
    "testing"
)

func BenchmarkDistributed(b *testing.B) {
    b.StopTimer()
    counter := distributed.New()
    agents := []*distributed.Agent{
        counter.Agent("abc123", 100),
        counter.Agent("def456", 100),
        counter.Agent("ghi789", 100),
    }
    b.StartTimer()

    var wg sync.WaitGroup
    wg.Add(b.N)
    for i := 0; i < b.N; i++ {
        go func() {
            for _, a := range agents {
                a.Incr(5)
            }
            for _, a := range agents {
                a.Incr(5)
            }
            wg.Done()
        }()
    }
    for _, a := range agents {
        a.Done()
    }
    wg.Wait()
}

结果

$ go test --bench=. --count 10 -benchmem
goos: linux
goarch: amd64
pkg: test/counters
BenchmarkDistributed-4       3356620           351 ns/op          24 B/op          0 allocs/op
BenchmarkDistributed-4       3414073           368 ns/op          11 B/op          0 allocs/op
BenchmarkDistributed-4       3371878           374 ns/op           7 B/op          0 allocs/op
BenchmarkDistributed-4       3240631           387 ns/op           3 B/op          0 allocs/op
BenchmarkDistributed-4       3169230           389 ns/op           2 B/op          0 allocs/op
BenchmarkDistributed-4       3177606           386 ns/op           0 B/op          0 allocs/op
BenchmarkDistributed-4       3064552           390 ns/op           0 B/op          0 allocs/op
BenchmarkDistributed-4       3065877           409 ns/op           2 B/op          0 allocs/op
BenchmarkDistributed-4       2924686           400 ns/op           1 B/op          0 allocs/op
BenchmarkDistributed-4       3049873           389 ns/op           0 B/op          0 allocs/op
BenchmarkMutex-4             1000000          1106 ns/op          17 B/op          0 allocs/op
BenchmarkMutex-4              948331          1246 ns/op           9 B/op          0 allocs/op
BenchmarkMutex-4             1000000          1244 ns/op          12 B/op          0 allocs/op
BenchmarkMutex-4             1000000          1246 ns/op          11 B/op          0 allocs/op
BenchmarkMutex-4             1000000          1228 ns/op           1 B/op          0 allocs/op
BenchmarkMutex-4             1000000          1235 ns/op           2 B/op          0 allocs/op
BenchmarkMutex-4             1000000          1244 ns/op           1 B/op          0 allocs/op
BenchmarkMutex-4             1000000          1214 ns/op           0 B/op          0 allocs/op
BenchmarkMutex-4              956024          1233 ns/op           0 B/op          0 allocs/op
BenchmarkMutex-4             1000000          1213 ns/op           0 B/op          0 allocs/op
PASS
ok      test/counters   37.461s

如果将限制值更改为 1000,代码会变得更快,立即无后顾之忧

$ go test --bench=. --count 10 -benchmem
goos: linux
goarch: amd64
pkg: test/counters
BenchmarkDistributed-4       5463523           221 ns/op           0 B/op          0 allocs/op
BenchmarkDistributed-4       5455981           220 ns/op           0 B/op          0 allocs/op
BenchmarkDistributed-4       5591240           213 ns/op           0 B/op          0 allocs/op
BenchmarkDistributed-4       5277915           212 ns/op           0 B/op          0 allocs/op
BenchmarkDistributed-4       5430421           213 ns/op           0 B/op          0 allocs/op
BenchmarkDistributed-4       5374153           226 ns/op           0 B/op          0 allocs/op
BenchmarkDistributed-4       5656743           219 ns/op           0 B/op          0 allocs/op
BenchmarkDistributed-4       5337343           211 ns/op           0 B/op          0 allocs/op
BenchmarkDistributed-4       5353845           217 ns/op           0 B/op          0 allocs/op
BenchmarkDistributed-4       5416137           217 ns/op           0 B/op          0 allocs/op
BenchmarkMutex-4             1000000          1002 ns/op         135 B/op          0 allocs/op
BenchmarkMutex-4             1253211          1141 ns/op          58 B/op          0 allocs/op
BenchmarkMutex-4             1000000          1261 ns/op           3 B/op          0 allocs/op
BenchmarkMutex-4              987345          1678 ns/op          59 B/op          0 allocs/op
BenchmarkMutex-4              925371          1247 ns/op           0 B/op          0 allocs/op
BenchmarkMutex-4             1000000          1259 ns/op           2 B/op          0 allocs/op
BenchmarkMutex-4              978800          1248 ns/op           0 B/op          0 allocs/op
BenchmarkMutex-4              982144          1213 ns/op           0 B/op          0 allocs/op
BenchmarkMutex-4              975681          1254 ns/op           0 B/op          0 allocs/op
BenchmarkMutex-4              994789          1205 ns/op           0 B/op          0 allocs/op
PASS
ok      test/counters   34.314s

更改 Counter.incrQ 长度也会极大地影响性能,尽管它会占用更多内存。

于 2020-09-13T09:31:27.647 回答
1

如果您的工作计数器类型不是动态的,即您可以预先将它们全部写出来,我认为您不会比这更简单或更快。

没有互斥体,没有通道,没有映射。只是一个静态大小的数组和一个枚举。

type WorkType int

const (
    WorkType1 WorkType = iota
    WorkType2
    WorkType3
    WorkType4
    NumWorkTypes
)

var workCounter [NumWorkTypes]int64

func updateWorkCount(workType WorkType, delta int) {
    atomic.AddInt64(&workCounter[workType], int64(delta))
}

用法如下:

updateWorkCount(WorkType1, 1)

如果您有时需要将工作类型用作字符串以进行显示,您始终可以使用像stringer这样的工具生成代码

于 2020-12-22T09:45:05.357 回答