抱歉,如果这是一个菜鸟问题,我是 grpc 的服务器端流媒体的新手。
我现在在流向客户端的服务器上的功能中拥有什么
req, err := http.NewRequest("GET", actualURL, nil)
//跳过一些行// res, _ := http.DefaultClient.Do(req)
// closing body
defer res.Body.Close()
body, err := ioutil.ReadAll(res.Body)
// 跳过一些行//
// unmarshaling the xml data received from the GET request done above
xmlDataErr := xml.Unmarshal(body, &ArrData)
// creating a variable of a custom type and creating object of specific struct Flight
var flightArrData ArrFlights
currArrData := flightArrData.Arr.Body.Flight
log.Println("计算要发送到客户端的总行数", len(currArrData))
// 循环响应并将其发送到客户端
对于 i := range currArrData {
farr := &pb.FlightArrivalsResponse{
ArrivalResponse: &pb.ArrivalFlightData{
Hapt: currArrData[i].HApt,
Fltnr: currArrData[i].Fltnr,
Sdt: currArrData[i].Sdt,
Acreg: currArrData[i].Acreg,
Park: currArrData[i].Park,
EstD: currArrData[i].EstD,
Gate: currArrData[i].Gate,
AblkD: currArrData[i].AblkD,
ActD: currArrData[i].ActD,
Callsign: currArrData[i].Callsign,
},
}
senderr := stream.Send(farr)
// skipping some lines //
}
// 完成后返回 nil return nil }
我面临的问题 在客户端我有接收此响应的功能,休眠 n 分钟并再次请求响应。
客户在第一次通话中确实得到了预期的响应,但是对于每个后续通话都会发生一些奇怪的事情,这是我当前的问题,我尝试以下面每个后续通话的形式进行说明:
从客户端调用 1 到服务器 --> 服务器返回 200 行
客户睡眠 n 分钟
从客户端调用 2 到服务器 --> 服务器返回 400 行!!(基本上每行是 200 + 200 的两倍)
客户睡眠 n 分钟
从客户端调用 3 到服务器 --> 服务器返回 600 行!!(200+200+200)
概括
我确实在客户端检查错误== io.EOF,现在停止从服务器到客户端的这种响应堆叠的唯一方法是停止服务器并重新启动。
我不确定我在这里遗漏了什么,以确保我只发送我从 GET 请求中收到的实际和准确的响应。将不胜感激任何提示。
更多信息
部分来自 proto 文件中的 gRPC protobuffer def
rpc GetFlightArrivals (FlightArrivalsRequestURL) returns (stream FlightArrivalsResponse) {
};
上述 gRPC 服务端 impl 的完整代码
func (s *Server) GetFlightArrivals(url *pb.FlightArrivalsRequestURL, stream pb.Flight_GetFlightArrivalsServer) error {
cData := ch.ConfigProcessor() // fetches the initial part of the URL from config file
if url.ArrivalURL == "" {
actualURL = cData.FURL + "/arr/all" // adding the arrival endpoint
} else {
actualURL = url.ArrivalURL
}
// build new request to get arrival data
req, err := http.NewRequest("GET", actualURL, nil)
if err != nil {
log.Fatalln("Recheck URL or connectivity, failed to make REST call")
}
req.Header.Add("Cache-Control", "no-cache")
req.Header.Add("Accept", "text/plain")
req.Header.Add("Connection", "keep-alive")
req.Header.Add("app_id", cData.AppID)
req.Header.Add("app_key", cData.AppKey)
req.Header.Add("Content-Type", "application/xml")
res, _ := http.DefaultClient.Do(req)
// closing body
defer res.Body.Close()
body, err := ioutil.ReadAll(res.Body)
if err != nil {
log.Fatalln("Failed to get any response")
return err
}
// unmarshaling the xml data
xmlDataErr := xml.Unmarshal(body, &flightArrData)
if xmlDataErr != nil {
log.Fatalln("Failed to unmarshal arrival xml data see error, ", xmlDataErr)
return xmlDataErr
}
currArrData := flightArrData.Arr.Body.Flight
log.Println("Counting total arrivals in Finland", len(currArrData))
log.Println("Starting FlightDeparturesResponse for client")
for i := range currArrData {
farr := &pb.FlightArrivalsResponse{
ArrivalResponse: &pb.ArrivalFlightData{
Hapt: currArrData[i].HApt,
Fltnr: currArrData[i].Fltnr,
Sdt: currArrData[i].Sdt,
Acreg: currArrData[i].Acreg,
Park: currArrData[i].Park,
EstD: currArrData[i].EstD,
Gate: currArrData[i].Gate,
AblkD: currArrData[i].AblkD,
ActD: currArrData[i].ActD,
Callsign: currArrData[i].Callsign,
},
}
senderr := stream.Send(farr)
if senderr != nil {
log.Fatalln("Failed to stream arrival response to the client, see error ", senderr)
return senderr
}
}
currArrData = nil
log.Println("Attempting to empty the arrival data")
return nil
}
检查上述 gRPC impl 的测试用例
我只是启动一个测试 grpc 服务器并在这个测试用例中调用该 grpc。客户端上的实现与接收数据相同。
func TestGetFlightArrivals(t *testing.T) {
const addr = "localhost:50051"
conn, err := grpc.Dial(addr, grpc.WithInsecure())
if err != nil {
t.Fatalf("Did not connect: #{err}")
}
defer conn.Close()
f := pb.NewFlightClient(conn)
t.Run("GetFlightArrivals", func(t *testing.T) {
var res, err = f.GetFlightArrivals(context.Background(), &pb.FlightArrivalsRequestURL{ArrivalURL: ""})
if err != nil {
t.Error("Failed to make the REST call", err)
}
for {
msg, err := res.Recv()
if err == io.EOF {
t.Log("Finished reading all the message")
break
}
if err != nil {
t.Error("Failed to receive response")
}
t.Log("Message from the server", msg.GetArrivalResponse())
}
})
}