0

我是春季集成的新手。

我有一个流程,我需要根据某些条件执行 http 或 tcp 调用。我关注的问题与 http 调用有关。调用的其余端点需要一个 accessToken 作为用于身份验证的标头参数,该参数由具有 2 个方法getCurrentAccessToken()refreshAccessToken(). 我只想在过期accessToken时调用方法刷新。currentAccessToken

我想做的是在执行对rest api的调用时添加以下逻辑:

如果令牌已过期,其余端点将返回 401,我想在流程中拦截此错误并通过添加刷新的访问令牌重试请求。

  @Bean
  public IntegrationFlow clientIn(AbstractServerConnectionFactory server,
      AbstractClientConnectionFactory client, LogService logService) {

    return IntegrationFlows.from(Tcp.inboundAdapter(client)
        .enrichHeaders(t -> t.headerFunction(IpHeaders.CONNECTION_ID, message -> this.client, true))
        .log(msg -> "client: " + logService.log(msg))
        .<byte[], Boolean>route(this::shouldForwardToHttp,
            mapping -> mapping.subFlowMapping(true, sf -> sf
                .enrichHeaders(t -> t.header("Content-Type", MimeTypeUtils.APPLICATION_JSON_VALUE))
                .<byte[], RequestMessage>transform(this::buildRequestFromMessage)
                .<RequestMessage, HttpEntity>transform(this::getHttpEntity)
                .handle(Http.outboundGateway(restUrl).httpMethod(HttpMethod.POST)
                    .expectedResponseType(ResponseMessage.class))
                .<ResponseMessage, byte[]>transform(p -> this.transformResponse(p))
                .handle(Tcp.outboundAdapter(client))).subFlowMapping(false,
                    t -> t.handle(Tcp.outboundAdapter(server).retryInterval(1000))))
        .get();
  }
 HttpEntity getHttpEntity(RequestMessage request) {

    MultiValueMap<String, String> mv = new HttpHeaders();
    mv.add("accessToken", tokenProvider.getCurrentAccessToken());
    HttpEntity entity = new HttpEntity(request, mv);
    return entity;
  }

我尝试通过添加requestHandlerRetry建议并将其重定向到 a recoveryChannel,但我无法将某些内容返回给调用者流以获取带有状态代码的响应并使用新的accessToken.

关于如何实现这一点的任何想法?

4

1 回答 1

0

我认为您不需要重试建议,因为您肯定是通过捕获该 401 异常并使用刷新的令牌调用服务来模拟它。听起来更像是递归。为了正确实现它,我建议查看ExpressionEvaluatingRequestHandlerAdvicehttps ://docs.spring.io/spring-integration/docs/current/reference/html/messaging-endpoints.html#message-handler-advice-chain 。是的,它类似于重试,它也有failureChannel,但是没有内置的重试,因为我们将模拟它在必要时一次又一次地调用相同的端点。

为了简化递归逻辑,我会将其提取.handle(Http.outboundGateway(restUrl).httpMethod(HttpMethod.POST) .expectedResponseType(ResponseMessage.class))到一个单独的流程中,并gateway()在主流程中为该流程使用带有输入通道的 a。

failureChannel流应将其消息重新路由回网关流的输入。

该逻辑中最重要的部分是进行所有原始请求消息头,其中包括网关逻辑所需的replyChannel

查看更多关于网关的文档:https ://docs.spring.io/spring-integration/docs/current/reference/html/messaging-endpoints.html#gateway 。

当 anExpressionEvaluatingRequestHandlerAdvice向 发送消息时failureChannel,它以ErrorMessage带有 aMessageHandlingExpressionEvaluatingAdviceException作为有效负载的形式出现。导致失败并具有所有必需标头的消息在getFailedMessage()属性中。因此,您获取该消息,请求新令牌,将其添加到基于该原始消息的新消息的标头中。最后,您将此新消息发送到IntegrationFlowHTTP 请求的输入通道。当一切正常时,HTTP 调用的结果将被转发到提到replyChannel的 from 标头,因此转发到后续步骤的主要流程。

于 2021-05-18T15:02:13.837 回答