3

我们为 Spring-Gateway 编写了一个自定义 Predicate 工厂来路由请求。我们正在解析 XML 请求的正文,然后根据正文中存在的特定方法派生路由。在执行此操作时,我们编写了以下代码来创建 ServerRquest。

@Override
public AsyncPredicate<ServerWebExchange> applyAsync(Config config) {
        return exchange -> {
            Class<String> inClass = String.class;

            Object cachedBody = exchange.getAttribute(CACHE_REQUEST_BODY_OBJECT_KEY);

            if (cachedBody != null) {
                try {
                    boolean test = config.pattern.matcher((String) cachedBody).matches();
                    exchange.getAttributes().put(TEST_ATTRIBUTE, test);
                    return Mono.just(test);
                } catch (ClassCastException e) {
                    LOG.error("Predicate test failed because String.class does not match the cached body object", e);
                }
                return Mono.just(false);
            } else {

                return DataBufferUtils.join(exchange.getRequest().getBody()).flatMap(dataBuffer -> {

                    DataBufferUtils.retain(dataBuffer);

                    Flux<DataBuffer> cachedFlux = Flux
                            .defer(() -> Flux.just(dataBuffer.slice(0, dataBuffer.readableByteCount())));

                    ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(exchange.getRequest()) {

                        @Override
                        public Flux<DataBuffer> getBody() {
                            return cachedFlux;
                        }
                    };
                    return ServerRequest.create(exchange.mutate().request(mutatedRequest).build(), messageReaders)
                            .bodyToMono(inClass).doOnNext(objectValue -> {
                                exchange.getAttributes().put(CACHE_REQUEST_BODY_OBJECT_KEY, objectValue);
                                exchange.getAttributes().put(CACHED_REQUEST_BODY_ATTR, cachedFlux);
                            }).map(objectValue -> config.pattern.matcher((String) objectValue).matches());
                });

            }
        };
    }

使用旧版本Spring-Boot-Parent (2.1.7.RELEASE) and spring-cloud-dependencies (Greenwich.RELEASE)完美运行此解决方案。但是使用最新版本的Spring-Boot-Parent (2.3.1.RELEASE) and spring-cloud-dependencies (Hoxton.SR6)我得到以下异常。网关应用程序正常启动,没有任何错误。

Caused by: java.lang.ClassCastException: class reactor.core.publisher.FluxDefer cannot be cast to class org.springframework.core.io.buffer.PooledDataBuffer (reactor.core.publisher.FluxDefer and org.springframework.core.io.buffer.PooledDataBuffer are in unnamed module of loader org.springframework.boot.loader.LaunchedURLClassLoader @306a30c7)
        at org.springframework.cloud.gateway.filter.RemoveCachedBodyFilter.lambda$filter$0(RemoveCachedBodyFilter.java:37) ~[spring-cloud-gateway-core-2.2.3.RELEASE.jar!/:2.2.3.RELEASE]
        at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.runFinally(FluxDoFinally.java:156) ~[reactor-core-3.3.6.RELEASE.jar!/:3.3.6.RELEASE]
        ... 84 more

有没有其他人也有同样的问题并且知道如何解决这个问题?

4

1 回答 1

3

问题是,这些 api 的格林威治版本是 beta。现在预期 in 的对象CACHED_REQUEST_BODY_ATTR必须是PooledDataBuffer. 所以我现在相应地更改了我的代码。现在看起来如下所示:

return DataBufferUtils.join(exchange.getRequest().getBody()).flatMap(dataBuffer -> {

                    DataBufferUtils.retain(dataBuffer);

                    Flux<DataBuffer> cachedFlux = Flux
                            .defer(() -> Flux.just(dataBuffer.slice(0, dataBuffer.readableByteCount())));

                    PooledDataBuffer cachePool = (PooledDataBuffer) dataBuffer.slice(0, dataBuffer.readableByteCount());
                    
                    ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(exchange.getRequest()) {

                        @Override
                        public Flux<DataBuffer> getBody() {
                            return cachedFlux;
                        }
                    };
                    return ServerRequest.create(exchange.mutate().request(mutatedRequest).build(), messageReaders)
                            .bodyToMono(inClass).doOnNext(objectValue -> {
                                exchange.getAttributes().put(CACHE_REQUEST_BODY_OBJECT_KEY, objectValue);
                                exchange.getAttributes().put(CACHED_REQUEST_BODY_ATTR, cachePool);
                            }).map(objectValue -> config.pattern.matcher((String) objectValue).matches());
                });

更新课程后,它现在按预期工作。

于 2020-07-29T10:34:26.307 回答