0

我正在尝试使用 rxjs.Observable 包装 grpc-web 服务器流客户端,并且如果说服务器返回错误,则能够执行重试。

考虑以下代码。

  // server
  foo = (call: (call: ServerWritableStream<FooRequest, Empty>): void => {
    if (!call.request?.getMessage()) {
      call.emit("error", { code: StatusCode.FAILED_PRECONDITION, message: "Invalid request" })
    }

    for (let i = 0; i <= 2; i++) {
      call.write(new FooResponse())
    }
    call.end()
  }
 
  // client 
  test("should not end on retry", (done) => {
    new Observable(obs => {
      const call =  new FooClient("http://localhost:8080").foo(new FooRequest())
      call.on("data", data => obs.next(data))
      call.on("error", err => {
        console.log("server emitted error")
        obs.error(err)
      })
      call.on("end", () => {
        console.log("server emitted end")
        obs.complete()
      })
    })
    .pipe(retryWhen(<custom retry policy>))
    .subscribe(
        _resp => () => {},
        _error => {
          console.log("source observable error")
          done()
        },
        () => {
          console.log("source observable completed(?)")
          done()
        })
  })
  
  // output
  server emitted error
  server emitted end
  source observable completed(?)

服务器在(?)发出“错误”之后发出“结束”事件,所以看起来我必须从源 observable 中删除“结束”处理程序。

结束/完成流的“Rx​​-y”方式是什么?

4

1 回答 1

1

对于任何有兴趣的人,我最终删除了“结束”事件处理程序并将其替换为“状态”,如果服务器返回OK状态代码(表示流结束),则可观察对象完成。

new Observable(obs => {
  const call =  new FooClient("http://localhost:8080").foo(new FooRequest())
  call.on("data", data => obs.next(data))
  call.on("error", err => obs.error(err))
  call.on("status", status: grpcWeb.Status => {
    if (status.code == grpcWeb.StatusCode.OK) {
      return observer.complete()
    }
  })
})
于 2020-10-16T05:53:22.640 回答