0

我的应用程序基本上是另一个服务的代理,我的应用程序将 JSON 转换为第三方服务所需的 SOAP,调用该服务,然后将 SOAP 响应转换为 JSON。有时我在发出请求后会收到OutOfMemoryError ,有时是在第一个请求时。

我需要以某种方式关闭Flowable吗?

我的 application.yml 中有以下配置供客户端使用

micronaut
  http:
    # https://docs.micronaut.io/latest/guide/index.html#clientConfiguration
    services:
      reqtest:
        url: http://10.1.111.59:6012
        path: /basic
        connect-timeout: 250ms
        connect-ttl: 250ms
        read-timeout: 2500ms
        #        Give the thread pool 1 second to shutdown
        shutdown-timeout: 1s
        #        Don't follow redirects
        follow-redirects: false
        #        Use a connection pool
        pool:
          enabled: true
          #          Not sure this is needed but set it to 50 for now
          max-connections: 50

这是我用来进行 http 调用的代码

            Maybe<ResponseEnvelope> response = service.makeRequest(request.convertToRequestEnvelope());
            ret = response.blockingGet().getBody().getProcessXmlMessageResponse().getMessageResult().getRateResponse();

这是我用来调用第三方服务器的服务

@Singleton
public class PassThruService {
    private static final String SOAP_ACTION_HEADER = "SOAPAction";
    private static final String SOAP_ACTION_HEADER_VALUE = "/ProcessXmlMessage";
    private static final Map<CharSequence, CharSequence> requestHeaders = Map.of(
            SOAP_ACTION_HEADER, SOAP_ACTION_HEADER_VALUE,
            HttpHeaders.ACCEPT, MediaType.TEXT_XML,
            HttpHeaders.CONTENT_TYPE, MediaType.TEXT_XML
    );

    private final RxHttpClient httpClient;

    @Inject
    public PassThruService(final @Client(reqtest) RxHttpClient httpClient) {
        this.httpClient = httpClient;
    }

    public Maybe<ResponseEnvelope> makeRequest(final RequestEnvelope payload) {
        // URI is part of the http client, so we pass an empty string here.
        HttpRequest<?> request = HttpRequest.POST("", payload)
                .headers(requestHeaders);
        final AtomicInteger tries = new AtomicInteger(1);
        log.info("making request.");
        Flowable<ResponseEnvelope> return = httpClient.retrieve(request, ResponseEnvelope.class);
        return return
                .doOnRequest(onRequest -> log.info("request={} started.", tries.get()))
                .doOnNext(next -> log.info("request={} onNext.", tries.get()))
                .doOnError(error -> log.error("request={} Received an error message={}",
                        tries.get(), error.getMessage()))
                .doFinally(() -> log.info("request={} completed", tries.getAndIncrement()))
                .firstElement();
    }
}

我添加doOnXXX主要是为了让我了解发生了什么。但是正如我在上面的代码中所说的那样,我会在控制台中收到以下错误,它看起来并不致命,因为服务器一直在运行,但它令人担忧:

java.lang.OutOfMemoryError: Java heap space
    at java.base/java.util.Arrays.copyOf(Arrays.java:3745)
    at java.base/java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:172)
    at java.base/java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:538)
    at java.base/java.lang.StringBuilder.append(StringBuilder.java:174)
    at io.micronaut.servlet.http.ServletHttpHandler.encodeResponse(ServletHttpHandler.java:711)
    at io.micronaut.servlet.http.ServletHttpHandler.lambda$null$10(ServletHttpHandler.java:463)
    at io.micronaut.servlet.http.ServletHttpHandler$$Lambda$627/0x000000080081ac40.call(Unknown Source)
    at io.reactivex.internal.operators.flowable.FlowableFromCallable.call(FlowableFromCallable.java:55)
    at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.onNext(FlowableFlatMap.java:143)
    at io.reactivex.internal.operators.flowable.FlowableCreate$LatestAsyncEmitter.drain(FlowableCreate.java:687)
    at io.reactivex.internal.operators.flowable.FlowableCreate$LatestAsyncEmitter.onNext(FlowableCreate.java:613)
    at io.micronaut.servlet.http.ServletHttpHandler.lambda$invokeExceptionHandlerIfPossible$29(ServletHttpHandler.java:864)
    at io.micronaut.servlet.http.ServletHttpHandler$$Lambda$661/0x00000008008d5440.subscribe(Unknown Source)
    at io.reactivex.internal.operators.flowable.FlowableCreate.subscribeActual(FlowableCreate.java:71)
    at io.reactivex.Flowable.subscribe(Flowable.java:14918)
    at io.reactivex.internal.operators.flowable.FlowableFlatMap.subscribeActual(FlowableFlatMap.java:53)
    at io.reactivex.Flowable.subscribe(Flowable.java:14918)
    at io.reactivex.internal.operators.flowable.FlowableOnErrorNext.subscribeActual(FlowableOnErrorNext.java:40)
    at io.reactivex.Flowable.subscribe(Flowable.java:14918)
    at io.reactivex.Flowable.subscribe(Flowable.java:14855)
    at io.reactivex.Flowable.subscribe(Flowable.java:14774)
    at io.micronaut.servlet.http.ServletHttpHandler.subscribeToResponsePublisher(ServletHttpHandler.java:475)
    at io.micronaut.servlet.http.ServletHttpHandler.emitError(ServletHttpHandler.java:281)
    at io.micronaut.servlet.http.ServletHttpHandler.invokeExceptionHandlerIfPossible(ServletHttpHandler.java:861)
    at io.micronaut.servlet.http.ServletHttpHandler.handleException(ServletHttpHandler.java:807)
    at io.micronaut.servlet.http.ServletHttpHandler.lambda$subscribeToResponsePublisher$12(ServletHttpHandler.java:467)
    at io.micronaut.servlet.http.ServletHttpHandler$$Lambda$538/0x0000000800706c40.apply(Unknown Source)
    at io.reactivex.internal.operators.flowable.FlowableOnErrorNext$OnErrorNextSubscriber.onError(FlowableOnErrorNext.java:103)
    at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.checkTerminate(FlowableFlatMap.java:567)
    at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.drainLoop(FlowableFlatMap.java:374)
    at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.drain(FlowableFlatMap.java:366)
    at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.onNext(FlowableFlatMap.java:147)

我正在调试这个

  • macOS 卡塔利娜
  • IntelliJ 2021.1.3(2021 年 6 月 30 日构建)
  • OpenJDK 运行时环境 AdoptOpenJDK-11.0.11+9 (build 11.0.11+9)
  • 微航海 v2.5.5
4

0 回答 0