0

我正在调用一个返回 a 的 API Future of Scala,并且我想转换为Reactor Flux显然没有阻塞并等待未来的响应。我正在尝试使用 EmitterProcessor,当未来成功时它工作正常,但不幸的是,当未来有超时时它不是。

这里的代码

private Flux<D> transformFutureToFlux(Future<T> future) {
    EmitterProcessor<D> emitterProcessor = EmitterProcessor.create();
    future.onComplete(getOnCompleteFunction(emitterProcessor), defaultExecutionContext());
    return Flux.from(emitterProcessor);
}

private <D> OnComplete getOnCompleteFunction(EmitterProcessor<D> emitterProcessor) {
    return new OnComplete() {
        @Override
        public void onComplete(Throwable t, Object result) {
            if (t != null) {
                processError(t);
            } else {
                processSucceed(result);
            }
        }

        private void processSucceed(Object result) {
            if (result instanceof ConnectorErrorVO) {
                publishConnectorErrorException((ConnectorErrorVO) result);
            } else {
                publishGenericResponse(result);
            }
        }

        private void publishGenericResponse(Object result) {
            if (result instanceof List<?>) {
                Flux.fromIterable((List<D>) result).subscribe(emitterProcessor);
            } else {
                Flux.just((D) result).subscribe(emitterProcessor);
            }
        }

        private void publishConnectorErrorException(ConnectorErrorVO result) {
            ConnectorErrorVO connectorErrorVO = result;
            Flux<D> error = Flux.error(new ConnectorErrorException( String.valueOf(connectorErrorVO.getCode()), connectorErrorVO.getDescription(), connectorErrorVO.getError()));
            error.subscribe(emitterProcessor);
        }

        private void processError(Throwable t) {
            ConnectorManagerExecutor.logger.error(null, "Error and recovery from connector manager transaction", t);
            if (t instanceof AskTimeoutException) {
                Flux.<D>error(new ConnectorErrorException("407", "connector timeout", ConnectorError.TIMEOUT)).subscribe(emitterProcessor);
            } else {
                Flux.<D>error(new ConnectorErrorException("500", t.getMessage(), ConnectorError.GENERIC)).subscribe(emitterProcessor);
            }
        }
    };

我正在尝试做的是正确的?,还有更好的方法吗?

问候

4

2 回答 2

1

如果您返回 a Future,则意味着只有 1 个返回,而不是一系列/返回值流。您可以根据需要转换为Flux,但为什么不改用Mono?

另外,如果您使用的是 scala,请尝试使用reactor-scala-extensions SMono.fromFuture

以 SMonoTest为例。

    import scala.concurrent.ExecutionContext.Implicits.global
    StepVerifier.create(SMono.fromFuture(Future[Long] {
      randomValue
    }))
      .expectNext(randomValue)
      .verifyComplete()
于 2019-10-11T06:15:44.060 回答
0

你试过Flux.create()吗?您无需使用处理器,而是注册一个调用提供的Sink方法的未来回调。例如。对于List您遍历列表并通过该sink.next(T)方法发出每个值的情况。

于 2019-10-03T12:52:18.140 回答