1

尝试将 Protobuf 与 RSocket 一起使用,Requester 不会考虑dataMimeType设置为application/protobufor application/vnd.google.protobuf。我收到错误No decoder

客户端应用程序

@SpringBootApplication
@Slf4j
public class PersonServiceClientApplication {

    @Bean
    RSocketRequester rSocketRequester(RSocketStrategies rSocketStrategies) {
        return RSocketRequester.builder()
                .rsocketFactory(
                        factory -> factory.dataMimeType("application/protobuf").frameDecoder(PayloadDecoder.ZERO_COPY))
                .rsocketStrategies(rSocketStrategies).connectTcp("localhost", 9080).retry().block();
    }

    @Bean
    public RSocketStrategiesCustomizer protobufRSocketStrategyCustomizer() {
        return (strategy) -> {
            strategy.decoder(new ProtobufDecoder());
            strategy.encoder(new ProtobufEncoder());
        };
    }

    public static void main(String[] args) {
        ApplicationContext context = SpringApplication.run(PersonServiceClientApplication.class, args);
        RSocketRequester req = context.getBean(RSocketRequester.class);
        req.route("io.github.kprasad99.person.get3").retrieveMono(Person.class)
                .doOnNext(e -> log.info(e.getFirstName())).block(Duration.ofSeconds(30));
    }

}

堆栈跟踪

Exception in thread "restartedMain" java.lang.reflect.InvocationTargetException
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at org.springframework.boot.devtools.restart.RestartLauncher.run(RestartLauncher.java:49)
Caused by: java.lang.IllegalArgumentException: No decoder for io.github.kprasad.person.proto.PersonProto$Person
    at org.springframework.messaging.rsocket.RSocketStrategies.decoder(RSocketStrategies.java:92)
    at org.springframework.messaging.rsocket.DefaultRSocketRequester$DefaultRequestSpec.retrieveMono(DefaultRSocketRequester.java:274)
    at org.springframework.messaging.rsocket.DefaultRSocketRequester$DefaultRequestSpec.retrieveMono(DefaultRSocketRequester.java:258)
    at io.github.kprasad99.person.PersonServiceClientApplication.main(PersonServiceClientApplication.java:69)
    ... 5 more

但是,如果我明确禁用RSocketStrategiesAutoConfiguration并重新创建RSocketStrategiesbean,它就可以工作。

SpringBootApplication(exclude = { RSocketStrategiesAutoConfiguration.class })
@Slf4j
public class PersonServiceClientApplication {

    private static final String PATHPATTERN_ROUTEMATCHER_CLASS = "org.springframework.web.util.pattern.PathPatternRouteMatcher";

    @Bean
    public RSocketStrategies rSocketStrategies(ObjectProvider<RSocketStrategiesCustomizer> customizers) {
        RSocketStrategies.Builder builder = RSocketStrategies.builder();
        if (ClassUtils.isPresent(PATHPATTERN_ROUTEMATCHER_CLASS, null)) {
            builder.routeMatcher(new PathPatternRouteMatcher());
        }
        customizers.orderedStream().forEach((customizer) -> customizer.customize(builder));
        return builder.build();
    }

    @Bean
    RSocketRequester rSocketRequester(RSocketStrategies rSocketStrategies) {
        return RSocketRequester.builder()
                .rsocketFactory(
                        factory -> factory.dataMimeType("application/protobuf").frameDecoder(PayloadDecoder.ZERO_COPY))
                .rsocketStrategies(rSocketStrategies).connectTcp("localhost", 9080).retry().block();
    }

    @Bean
    public RSocketStrategiesCustomizer protobufRSocketStrategyCustomizer() {
        return (strategy) -> {
            strategy.decoder(new ProtobufDecoder());
            strategy.encoder(new ProtobufEncoder());
        };
    }

    public static void main(String[] args) {
        ApplicationContext context = SpringApplication.run(PersonServiceClientApplication.class, args);
        RSocketRequester req = context.getBean(RSocketRequester.class);
        req.route("io.github.kprasad99.person.get3").retrieveMono(Person.class)
                .doOnNext(e -> log.info(e.getFirstName())).block(Duration.ofSeconds(30));
    }

}

为什么也dataMimeType没有protobufRSocketStrategyCustomizer考虑过为什么要解码。

4

1 回答 1

1

能够解决这个问题,需要使用application/x-protobuf。下面是为客户端创建的 bean。

    @Bean
    public Mono<RSocketRequester> rSocketRequester(
            RSocketRequester.Builder rsocketRequesterBuilder, ClientTransport clientTransport,
            RSocketStrategies strategies) {
        Mono<RSocketRequester> rsocketRequester = rsocketRequesterBuilder.rsocketStrategies(strategies)
                .dataMimeType(new MimeType("application", "x-protobuf"))
                .connect(clientTransport).log();
        return rsocketRequester;
    }

    @Bean
    public RSocketStrategiesCustomizer protobufRSocketStrategyCustomizer() {
        return (strategy) -> {
            strategy.decoder(new ProtobufDecoder());
            strategy.encoder(new ProtobufEncoder());
        };
    }
于 2020-07-13T05:07:30.810 回答