0

抱歉,如果这是一个菜鸟问题,我是 grpc 的服务器端流媒体的新手。

我现在在流向客户端的服务器上的功能中拥有什么

req, err := http.NewRequest("GET", actualURL, nil)

//跳过一些行// res, _ := http.DefaultClient.Do(req)

// closing body
defer res.Body.Close()

body, err := ioutil.ReadAll(res.Body)

// 跳过一些行//

// unmarshaling the xml data received from the GET request done above
xmlDataErr := xml.Unmarshal(body, &ArrData)

// creating a variable of a custom type and creating object of specific struct Flight
var flightArrData ArrFlights
currArrData := flightArrData.Arr.Body.Flight

log.Println("计算要发送到客户端的总行数", len(currArrData))

// 循环响应并将其发送到客户端

对于 i := range currArrData {

    farr := &pb.FlightArrivalsResponse{
        ArrivalResponse: &pb.ArrivalFlightData{
            Hapt:     currArrData[i].HApt,
            Fltnr:    currArrData[i].Fltnr,
            Sdt:      currArrData[i].Sdt,
            Acreg:    currArrData[i].Acreg,
            Park:     currArrData[i].Park,
            EstD:     currArrData[i].EstD,
            Gate:     currArrData[i].Gate,
            AblkD:    currArrData[i].AblkD,
            ActD:     currArrData[i].ActD,
            Callsign: currArrData[i].Callsign,
        },
    }

    senderr := stream.Send(farr)

    // skipping some lines //
}

// 完成后返回 nil return nil }

我面临的问题 在客户端我有接收此响应的功能,休眠 n 分钟并再次请求响应。

客户在第一次通话中确实得到了预期的响应,但是对于每个后续通话都会发生一些奇怪的事情,这是我当前的问题,我尝试以下面每个后续通话的形式进行说明:

从客户端调用 1 到服务器 --> 服务器返回 200 行

客户睡眠 n 分钟

从客户端调用 2 到服务器 --> 服务器返回 400 行!!(基本上每行是 200 + 200 的两倍)

客户睡眠 n 分钟

从客户端调用 3 到服务器 --> 服务器返回 600 行!!(200+200+200)

概括

我确实在客户端检查错误== io.EOF,现在停止从服务器到客户端的这种响应堆叠的唯一方法是停止服务器并重新启动。

我不确定我在这里遗漏了什么,以确保我只发送我从 GET 请求中收到的实际和准确的响应。将不胜感激任何提示。

更多信息

部分来自 proto 文件中的 gRPC protobuffer def

rpc GetFlightArrivals (FlightArrivalsRequestURL) returns (stream FlightArrivalsResponse) {
    };

上述 gRPC 服务端 impl 的完整代码

func (s *Server) GetFlightArrivals(url *pb.FlightArrivalsRequestURL, stream pb.Flight_GetFlightArrivalsServer) error {

    cData := ch.ConfigProcessor() // fetches the initial part of the URL from config file

    if url.ArrivalURL == "" {
        actualURL = cData.FURL + "/arr/all" // adding the arrival endpoint

    } else {
        actualURL = url.ArrivalURL
    }

    // build new request to get arrival data
    req, err := http.NewRequest("GET", actualURL, nil)
    if err != nil {
        log.Fatalln("Recheck URL or connectivity, failed to make REST call")
    }
    req.Header.Add("Cache-Control", "no-cache")
    req.Header.Add("Accept", "text/plain")
    req.Header.Add("Connection", "keep-alive")
    req.Header.Add("app_id", cData.AppID)
    req.Header.Add("app_key", cData.AppKey)
    req.Header.Add("Content-Type", "application/xml")


    res, _ := http.DefaultClient.Do(req)

    // closing body
    defer res.Body.Close()

    body, err := ioutil.ReadAll(res.Body)
    if err != nil {
        log.Fatalln("Failed to get any response")
        return err
    }
    // unmarshaling the xml data
    xmlDataErr := xml.Unmarshal(body, &flightArrData)
    if xmlDataErr != nil {
        log.Fatalln("Failed to unmarshal arrival xml data see error, ", xmlDataErr)
        return xmlDataErr
    }

    currArrData := flightArrData.Arr.Body.Flight
    log.Println("Counting total arrivals in Finland", len(currArrData))
    log.Println("Starting FlightDeparturesResponse for client")
    for i := range currArrData {

        farr := &pb.FlightArrivalsResponse{
            ArrivalResponse: &pb.ArrivalFlightData{
                Hapt:     currArrData[i].HApt,
                Fltnr:    currArrData[i].Fltnr,
                Sdt:      currArrData[i].Sdt,
                Acreg:    currArrData[i].Acreg,
                Park:     currArrData[i].Park,
                EstD:     currArrData[i].EstD,
                Gate:     currArrData[i].Gate,
                AblkD:    currArrData[i].AblkD,
                ActD:     currArrData[i].ActD,
                Callsign: currArrData[i].Callsign,
            },
        }

        senderr := stream.Send(farr)

        if senderr != nil {
            log.Fatalln("Failed to stream arrival response to the client, see error ", senderr)
            return senderr
        }

    }
    currArrData = nil
    log.Println("Attempting to empty the arrival data")
    return nil
}

检查上述 gRPC impl 的测试用例

我只是启动一个测试 grpc 服务器并在这个测试用例中调用该 grpc。客户端上的实现与接收数据相同。

func TestGetFlightArrivals(t *testing.T) {
    const addr = "localhost:50051"
    conn, err := grpc.Dial(addr, grpc.WithInsecure())
    if err != nil {
        t.Fatalf("Did not connect: #{err}")
    }
    defer conn.Close()
    f := pb.NewFlightClient(conn)

    t.Run("GetFlightArrivals", func(t *testing.T) {

        var res, err = f.GetFlightArrivals(context.Background(), &pb.FlightArrivalsRequestURL{ArrivalURL: ""})
        if err != nil {
            t.Error("Failed to make the REST call", err)
        }

        for {
            msg, err := res.Recv()
            if err == io.EOF {
                t.Log("Finished reading all the message")
                break
            }
            if err != nil {
                t.Error("Failed to receive response")
            }
            t.Log("Message from the server", msg.GetArrivalResponse())

        }
    })
}

4

1 回答 1

0

根据@rubens21 的评论,我添加了以下几行 var emptyArrData ArrFlights,这基本上是完整的 XML 结构,并将这个 emptyArrData 分配给该结构flightArrData = emptyArrData

于 2020-03-04T13:36:03.487 回答