为高并发应用程序实现全局计数器的最佳方法是什么?就我而言,我可能有 10K-20K 的例程执行“工作”,我想计算例程共同处理的项目的数量和类型......
“经典”同步编码风格如下所示:
var work_counter int
func GoWorkerRoutine() {
for {
// do work
atomic.AddInt32(&work_counter,1)
}
}
现在这变得更加复杂,因为我想跟踪正在完成的工作的“类型”,所以我真的需要这样的东西:
var work_counter map[string]int
var work_mux sync.Mutex
func GoWorkerRoutine() {
for {
// do work
work_mux.Lock()
work_counter["type1"]++
work_mux.Unlock()
}
}
似乎应该有一种使用通道或类似的“go”优化方式:
var work_counter int
var work_chan chan int // make() called somewhere else (buffered)
// started somewher else
func GoCounterRoutine() {
for {
select {
case c := <- work_chan:
work_counter += c
break
}
}
}
func GoWorkerRoutine() {
for {
// do work
work_chan <- 1
}
}
最后一个示例仍然缺少地图,但这很容易添加。这种风格会比简单的原子增量提供更好的性能吗?当我们谈论对全局值的并发访问与可能阻止 I/O 完成的事情时,我无法判断这是否或多或少复杂......
想法受到赞赏。
2013 年 5 月 28 日更新:
我测试了几个实现,结果不是我预期的,这是我的计数器源代码:
package helpers
import (
)
type CounterIncrementStruct struct {
bucket string
value int
}
type CounterQueryStruct struct {
bucket string
channel chan int
}
var counter map[string]int
var counterIncrementChan chan CounterIncrementStruct
var counterQueryChan chan CounterQueryStruct
var counterListChan chan chan map[string]int
func CounterInitialize() {
counter = make(map[string]int)
counterIncrementChan = make(chan CounterIncrementStruct,0)
counterQueryChan = make(chan CounterQueryStruct,100)
counterListChan = make(chan chan map[string]int,100)
go goCounterWriter()
}
func goCounterWriter() {
for {
select {
case ci := <- counterIncrementChan:
if len(ci.bucket)==0 { return }
counter[ci.bucket]+=ci.value
break
case cq := <- counterQueryChan:
val,found:=counter[cq.bucket]
if found {
cq.channel <- val
} else {
cq.channel <- -1
}
break
case cl := <- counterListChan:
nm := make(map[string]int)
for k, v := range counter {
nm[k] = v
}
cl <- nm
break
}
}
}
func CounterIncrement(bucket string, counter int) {
if len(bucket)==0 || counter==0 { return }
counterIncrementChan <- CounterIncrementStruct{bucket,counter}
}
func CounterQuery(bucket string) int {
if len(bucket)==0 { return -1 }
reply := make(chan int)
counterQueryChan <- CounterQueryStruct{bucket,reply}
return <- reply
}
func CounterList() map[string]int {
reply := make(chan map[string]int)
counterListChan <- reply
return <- reply
}
它使用通道进行写入和读取,这似乎是合乎逻辑的。
这是我的测试用例:
func bcRoutine(b *testing.B,e chan bool) {
for i := 0; i < b.N; i++ {
CounterIncrement("abc123",5)
CounterIncrement("def456",5)
CounterIncrement("ghi789",5)
CounterIncrement("abc123",5)
CounterIncrement("def456",5)
CounterIncrement("ghi789",5)
}
e<-true
}
func BenchmarkChannels(b *testing.B) {
b.StopTimer()
CounterInitialize()
e:=make(chan bool)
b.StartTimer()
go bcRoutine(b,e)
go bcRoutine(b,e)
go bcRoutine(b,e)
go bcRoutine(b,e)
go bcRoutine(b,e)
<-e
<-e
<-e
<-e
<-e
}
var mux sync.Mutex
var m map[string]int
func bmIncrement(bucket string, value int) {
mux.Lock()
m[bucket]+=value
mux.Unlock()
}
func bmRoutine(b *testing.B,e chan bool) {
for i := 0; i < b.N; i++ {
bmIncrement("abc123",5)
bmIncrement("def456",5)
bmIncrement("ghi789",5)
bmIncrement("abc123",5)
bmIncrement("def456",5)
bmIncrement("ghi789",5)
}
e<-true
}
func BenchmarkMutex(b *testing.B) {
b.StopTimer()
m=make(map[string]int)
e:=make(chan bool)
b.StartTimer()
for i := 0; i < b.N; i++ {
bmIncrement("abc123",5)
bmIncrement("def456",5)
bmIncrement("ghi789",5)
bmIncrement("abc123",5)
bmIncrement("def456",5)
bmIncrement("ghi789",5)
}
go bmRoutine(b,e)
go bmRoutine(b,e)
go bmRoutine(b,e)
go bmRoutine(b,e)
go bmRoutine(b,e)
<-e
<-e
<-e
<-e
<-e
}
我实现了一个简单的基准测试,在地图周围只有一个互斥锁(只是测试写入),并用 5 个并行运行的 goroutine 对两者进行了基准测试。结果如下:
$ go test --bench=. helpers
PASS
BenchmarkChannels 100000 15560 ns/op
BenchmarkMutex 1000000 2669 ns/op
ok helpers 4.452s
我没想到互斥锁会这么快......
进一步的想法?