7

我们正在开发一个高频交易平台,并在我们的一个组件中使用 golang 实现了 grpc。我们需要在我们的一个用例中使用双向流,我们在下面的代码中做了一个示例实现,但是当我们通过检查日志的时间戳之间的差异来测试代码的性能时

Recv Time %v Index: %v Num: %v
Send Time %v, Index: %v, Num: %v

我们发现从客户端调用流的.Send方法并通过在服务器端调用.Recv接收相同的数据大约需要 400-800 微秒,这对我们来说太短了。我们需要最大 10-50 微秒的性能,当我们阅读指南时,我们看到如果客户端和服务器都在同一台计算机上,grpc 可以达到纳秒(这正是我们的情况)

所以我认为我们缺少一些选项或一些性能技巧。有谁知道我们可以做些什么来增加这个性能问题

客户代码:

package main

import (
    "context"
    "log"
    "math/rand"

    pb "github.com/pahanini/go-grpc-bidirectional-streaming-example/src/proto"

    "time"

    "google.golang.org/grpc"
)

func main() {
    rand.Seed(time.Now().Unix())

    // dail server
    conn, err := grpc.Dial(":50005", grpc.WithInsecure())
    if err != nil {
        log.Fatalf("can not connect with server %v", err)
    }

    // create stream
    client := pb.NewMathClient(conn)
    stream, err := client.Max(context.Background())
    if err != nil {
        log.Fatalf("openn stream error %v", err)
    }

    var max int32
    ctx := stream.Context()
    done := make(chan bool)
    msgCount := 100
    fromMsg := 0

    // first goroutine sends random increasing numbers to stream
    // and closes int after 10 iterations
    go func() {
        for i := 1; i <= msgCount; i++ {
            // generate random nummber and send it to stream
            rnd := int32(i)
            req := pb.Request{Num: rnd}
            if i-1 >= fromMsg {
                sendTime := time.Now().UnixNano()
                log.Printf("Send Time %v, Index: %v, Num: %v", sendTime,i-1,req.Num)
            }

            if err := stream.Send(&req); err != nil {
                log.Fatalf("can not send %v", err)
            }
            //afterSendTime := time.Now().UnixNano()
            //log.Printf("After Send Time %v", afterSendTime)
            //log.Printf("---------------")
            //log.Printf("%d sent", req.Num)
            //time.Sleep(time.Millisecond * 200)
        }
        if err := stream.CloseSend(); err != nil {
            log.Println(err)
        }
    }()

    // third goroutine closes done channel
    // if context is done
    go func() {
        <-ctx.Done()
        if err := ctx.Err(); err != nil {
            log.Println(err)
        }
        close(done)
    }()

    <-done
    log.Printf("finished with max=%d", max)
}

服务器代码:

package main

import (
    "io"
    "log"
    "net"
    "time"

    pb "github.com/pahanini/go-grpc-bidirectional-streaming-example/src/proto"

    "google.golang.org/grpc"
)

type server struct{}

func (s server) Max(srv pb.Math_MaxServer) error {

    log.Println("start new server")
    var max int32
    ctx := srv.Context()

    i := 0
    fromMsg := 0
    for {
        // exit if context is done
        // or continue
        select {
        case <-ctx.Done():
            return ctx.Err()
        default:
        }

        // receive data from stream
        req, err := srv.Recv()

        if err == io.EOF {
            // return will close stream from server side
            log.Println("exit")
            return nil
        }
        if err != nil {
            log.Printf("receive error %v", err)
            continue
        }

        if i >= fromMsg {
            recvTime := time.Now().UnixNano()
            log.Printf("Recv Time %v Index: %v Num: %v", recvTime,i,req.Num)
        }

        i++

        // continue if number reveived from stream
        // less than max
        if req.Num <= max {
            continue
        }

        // update max and send it to stream
        /*
            max = req.Num
            resp := pb.Response{Result: max}
            if err := srv.Send(&resp); err != nil {
                log.Printf("send error %v", err)
            }
        */
        //log.Printf("send new max=%d", max)
    }
}

func main() {
    // create listiner
    lis, err := net.Listen("tcp", ":50005")
    if err != nil {
        log.Fatalf("failed to listen: %v", err)
    }

    // create grpc server
    s := grpc.NewServer()
    pb.RegisterMathServer(s, server{})

    // and start...
    if err := s.Serve(lis); err != nil {
        log.Fatalf("failed to serve: %v", err)
    }
}
4

0 回答 0