我正在探索 etcd 来实现分布式环境的序列号生成器。我的要求是生成不重复的序列号,用于同一应用程序的多个实例的每个请求中。并且可以有n个这样的应用程序的要求。我使用golang 客户端包中提供的 STM 和互斥锁以多种方式为此做了 POC
在本地机器设置中使用单节点 etcd 服务器(将是 RAFT 在生产中工作的至少 3 个节点集群),我编写了一个简单的程序来在 500 个 goroutine 中生成 ids(数字)。每个例程每个都有 10 个 id,因此总共有 5000 个 id。使用时间统计,重试尝试的 STM 比互斥锁执行得更好。除了这些方法之外,还有没有更好的选择来实现序列号生成?首先可以将 etcd 用于此目的吗?
PS:我附上代码示例仅供参考。我不希望它被审查。我关心的是使用 etcd 生成序列号的正确方法
package main
import (
"context"
"errors"
"strconv"
"sync"
"sync/atomic"
"time"
CONC "go.etcd.io/etcd/clientv3/concurrency"
"github.com/golang/glog"
ETCD "go.etcd.io/etcd/clientv3"
)
var client *ETCD.Client
var deadline = 200 * time.Second
func main() {
var err error
client, err = ETCD.New(ETCD.Config{
Endpoints: []string{"127.0.0.1:2379"},
})
if err != nil {
glog.Errorln("err:", err)
return
}
idGen := &SeqIDGenerator{key: "_id"}
err = func() error {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
_, err = client.Put(ctx, idGen.key, strconv.FormatInt(0, 10))
return err
}()
if err != nil {
glog.Errorln("err:", err)
return
}
id, err := idGen.nextWithMutex()
if err != nil {
glog.Errorln("err:", err)
return
}
glog.Errorln("done", id)
id, err = idGen.nextWithSTMSerialiazable()
if err != nil {
glog.Errorln("err:", err)
return
}
glog.Errorln("done", id)
// st := time.Now()
// stressSTMSerialiazableSeq(idGen)
// glog.Errorln(time.Since(st))
}
type SeqIDGenerator struct {
key string
}
func (idGen *SeqIDGenerator) nextWithSTMSerialiazable() (int64, error) {
var retrived int64
ctx, cancel := context.WithTimeout(context.Background(), deadline)
defer cancel()
var err error
retry := retry
for retry > 0 {
retry--
stmresp, err := CONC.NewSTMSerializable(ctx, client, func(s CONC.STM) error {
v := s.Get(idGen.key)
retrived, err = strconv.ParseInt(v, 10, 64)
if err != nil {
return err
}
retrived++
s.Put(idGen.key, strconv.FormatInt(retrived, 10))
return nil
})
if err != nil {
continue
} else if stmresp.Succeeded {
return retrived, nil
}
}
return 0, errors.New("ID gen failed. Retry exceeded")
}
func (idGen *SeqIDGenerator) nextWithMutex() (int64, error) {
s, err := CONC.NewSession(client) // explore options to pass
if err != nil {
return 0, err
}
m := CONC.NewMutex(s, idGen.key)
ctx, cancel := context.WithTimeout(context.Background(), deadline)
defer cancel()
m.Lock(ctx)
defer m.Unlock(ctx)
resp, err := client.Get(ctx, idGen.key)
if err != nil {
return 0, err
}
retrived, err := strconv.ParseInt(string(resp.OpResponse().Get().Kvs[0].Value), 10, 64)
if err != nil {
return 0, err
}
retrived++
_, err = client.Put(ctx, idGen.key, strconv.FormatInt(retrived, 10))
if err != nil {
return 0, err
}
return retrived, nil
}
func (idGen *SeqIDGenerator) nextWithSTMReapeatable() (int64, error) {
var retrived int64
ctx, cancel := context.WithTimeout(context.Background(), deadline)
defer cancel()
var err error
retry := retry
for retry > 0 {
retry--
stmresp, err := CONC.NewSTMRepeatable(ctx, client, func(s CONC.STM) error {
v := s.Get(idGen.key)
retrived, err = strconv.ParseInt(v, 10, 64)
if err != nil {
return err
}
retrived++
s.Put(idGen.key, strconv.FormatInt(retrived, 10))
return nil
})
if err != nil {
continue
} else if stmresp.Succeeded {
return retrived, nil
}
}
return 0, errors.New("ID gen failed. Retry exceeded")
}
var n int = 500
var retry int = 40 // move as conf
func stressMutex(idGen *SeqIDGenerator) {
wg := &sync.WaitGroup{}
wg.Add(n)
for i := 0; i < n; i++ {
go func(i int) {
defer wg.Done()
for j := 0; j < 10; j++ {
_, err := idGen.nextWithMutex()
if err != nil {
glog.Errorln("err:", err)
return
}
}
}(i)
}
wg.Wait()
}
func stressMutexSeq(idGen *SeqIDGenerator) {
for i := 0; i < n; i++ {
for j := 0; j < 10; j++ {
_, err := idGen.nextWithMutex()
if err != nil {
glog.Errorln("err:", err)
}
}
}
}
func stressSTMSerialiazableSeq(idGen *SeqIDGenerator) {
for i := 0; i < n; i++ {
for j := 0; j < 10; j++ {
_, err := idGen.nextWithSTMSerialiazable()
if err != nil {
glog.Errorln("err:", err)
}
}
}
}
func stressSTMReapeatableSeq(idGen *SeqIDGenerator) {
for i := 0; i < n; i++ {
for j := 0; j < 10; j++ {
_, err := idGen.nextWithSTMReapeatable()
if err != nil {
glog.Errorln("err:", err)
}
}
}
}
func stressSTMSerialiazable(idGen *SeqIDGenerator) {
wg := &sync.WaitGroup{}
wg.Add(n)
var success int64
for i := 0; i < n; i++ {
go func(i int) {
defer wg.Done()
for j := 0; j < 10; j++ {
_, err := idGen.nextWithSTMSerialiazable()
if err != nil {
glog.Errorln("err:", err)
} else {
atomic.AddInt64(&success, 1)
}
}
}(i)
}
wg.Wait()
glog.Errorln("success:", success)
}
func stressSTMReapeatable(idGen *SeqIDGenerator) {
wg := &sync.WaitGroup{}
wg.Add(n)
var success int64
for i := 0; i < n; i++ {
go func(i int) {
defer wg.Done()
for j := 0; j < 10; j++ {
_, err := idGen.nextWithSTMReapeatable()
if err != nil {
glog.Errorln("err:", err)
} else {
atomic.AddInt64(&success, 1)
}
}
}(i)
}
wg.Wait()
glog.Errorln("success:", success)
}