我们正在开发一个高频交易平台,并在我们的一个组件中使用 golang 实现了 grpc。我们需要在我们的一个用例中使用双向流,我们在下面的代码中做了一个示例实现,但是当我们通过检查日志的时间戳之间的差异来测试代码的性能时
Recv Time %v Index: %v Num: %v
Send Time %v, Index: %v, Num: %v
我们发现从客户端调用流的.Send方法并通过在服务器端调用.Recv接收相同的数据大约需要 400-800 微秒,这对我们来说太短了。我们需要最大 10-50 微秒的性能,当我们阅读指南时,我们看到如果客户端和服务器都在同一台计算机上,grpc 可以达到纳秒(这正是我们的情况)
所以我认为我们缺少一些选项或一些性能技巧。有谁知道我们可以做些什么来增加这个性能问题
客户代码:
package main
import (
"context"
"log"
"math/rand"
pb "github.com/pahanini/go-grpc-bidirectional-streaming-example/src/proto"
"time"
"google.golang.org/grpc"
)
func main() {
rand.Seed(time.Now().Unix())
// dail server
conn, err := grpc.Dial(":50005", grpc.WithInsecure())
if err != nil {
log.Fatalf("can not connect with server %v", err)
}
// create stream
client := pb.NewMathClient(conn)
stream, err := client.Max(context.Background())
if err != nil {
log.Fatalf("openn stream error %v", err)
}
var max int32
ctx := stream.Context()
done := make(chan bool)
msgCount := 100
fromMsg := 0
// first goroutine sends random increasing numbers to stream
// and closes int after 10 iterations
go func() {
for i := 1; i <= msgCount; i++ {
// generate random nummber and send it to stream
rnd := int32(i)
req := pb.Request{Num: rnd}
if i-1 >= fromMsg {
sendTime := time.Now().UnixNano()
log.Printf("Send Time %v, Index: %v, Num: %v", sendTime,i-1,req.Num)
}
if err := stream.Send(&req); err != nil {
log.Fatalf("can not send %v", err)
}
//afterSendTime := time.Now().UnixNano()
//log.Printf("After Send Time %v", afterSendTime)
//log.Printf("---------------")
//log.Printf("%d sent", req.Num)
//time.Sleep(time.Millisecond * 200)
}
if err := stream.CloseSend(); err != nil {
log.Println(err)
}
}()
// third goroutine closes done channel
// if context is done
go func() {
<-ctx.Done()
if err := ctx.Err(); err != nil {
log.Println(err)
}
close(done)
}()
<-done
log.Printf("finished with max=%d", max)
}
服务器代码:
package main
import (
"io"
"log"
"net"
"time"
pb "github.com/pahanini/go-grpc-bidirectional-streaming-example/src/proto"
"google.golang.org/grpc"
)
type server struct{}
func (s server) Max(srv pb.Math_MaxServer) error {
log.Println("start new server")
var max int32
ctx := srv.Context()
i := 0
fromMsg := 0
for {
// exit if context is done
// or continue
select {
case <-ctx.Done():
return ctx.Err()
default:
}
// receive data from stream
req, err := srv.Recv()
if err == io.EOF {
// return will close stream from server side
log.Println("exit")
return nil
}
if err != nil {
log.Printf("receive error %v", err)
continue
}
if i >= fromMsg {
recvTime := time.Now().UnixNano()
log.Printf("Recv Time %v Index: %v Num: %v", recvTime,i,req.Num)
}
i++
// continue if number reveived from stream
// less than max
if req.Num <= max {
continue
}
// update max and send it to stream
/*
max = req.Num
resp := pb.Response{Result: max}
if err := srv.Send(&resp); err != nil {
log.Printf("send error %v", err)
}
*/
//log.Printf("send new max=%d", max)
}
}
func main() {
// create listiner
lis, err := net.Listen("tcp", ":50005")
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
// create grpc server
s := grpc.NewServer()
pb.RegisterMathServer(s, server{})
// and start...
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}