0

使用具有直接消息交换机制的 Micronaut RabbitMQ,在将值从侦听器返回给生产者时,出现如下所述的 JSON 解析器错误

08:37:58.637 [main] INFO  io.micronaut.runtime.Micronaut - Startup completed in 8231ms. Server Running: http://localhost:8080
08:38:05.170 [pool-2-thread-4] ERROR i.m.h.n.stream.HttpStreamsHandler - Error occurred writing stream response: Error decoding JSON stream for type [T]: Unexpected token (START_OBJECT), expected END_ARRAY: Attempted to unwrap 'com.example.domain.ProductViewModel' value from an array (with `DeserializationFeature.UNWRAP_SINGLE_VALUE_ARRAYS`) but it contains more than one value
 at [Source: (byte[])"[{"id":"111-ffff-111","name":"name","price":100.0,"description":"description"},{"id":"111-ffff-111","name":"name","price":100.0,"description":"description"},{"id":"111-ffff-111","name":"name","price":100.0,"description":"description"},{"id":"111-ffff-111","name":"name","price":100.0,"description":"description"},{"id":"111-ffff-111","name":"name","price":100.0,"description":"description"},{"id":"111-ffff-111","name":"name","price":100.0,"description":"description"},{"id":"111-ffff-111","name":"na"[truncated 125 bytes]; line: 1, column: 80]
io.micronaut.core.serialize.exceptions.SerializationException: Error decoding JSON stream for type [T]: Unexpected token (START_OBJECT), expected END_ARRAY: Attempted to unwrap 'com.example.domain.ProductViewModel' value from an array (with `DeserializationFeature.UNWRAP_SINGLE_VALUE_ARRAYS`) but it contains more than one value
 at [Source: (byte[])"[{"id":"111-ffff-111","name":"name","price":100.0,"description":"description"},{"id":"111-ffff-111","name":"name","price":100.0,"description":"description"},{"id":"111-ffff-111","name":"name","price":100.0,"description":"description"},{"id":"111-ffff-111","name":"name","price":100.0,"description":"description"},{"id":"111-ffff-111","name":"name","price":100.0,"description":"description"},{"id":"111-ffff-111","name":"name","price":100.0,"description":"description"},{"id":"111-ffff-111","name":"na"[truncated 125 bytes]; line: 1, column: 80]
    at io.micronaut.rabbitmq.serdes.JsonRabbitMessageSerDes.deserialize(JsonRabbitMessageSerDes.java:74)
    at io.micronaut.rabbitmq.intercept.RabbitMQIntroductionAdvice.deserialize(RabbitMQIntroductionAdvice.java:323)
    at io.micronaut.rabbitmq.intercept.RabbitMQIntroductionAdvice.lambda$intercept$22(RabbitMQIntroductionAdvice.java:268)
    at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.onNext(FlowableFlatMap.java:132)
    at io.micronaut.reactive.rxjava2.RxInstrumentedSubscriber.onNext(RxInstrumentedSubscriber.java:59)
    at io.reactivex.internal.operators.flowable.FlowableTimeoutTimed$TimeoutSubscriber.onNext(FlowableTimeoutTimed.java:101)
    at io.micronaut.reactive.rxjava2.RxInstrumentedSubscriber.onNext(RxInstrumentedSubscriber.java:59)
    at io.reactivex.internal.subscriptions.DeferredScalarSubscription.complete(DeferredScalarSubscription.java:132)
    at io.reactivex.internal.operators.single.SingleToFlowable$SingleToFlowableObserver.onSuccess(SingleToFlowable.java:62)
    at io.micronaut.reactive.rxjava2.RxInstrumentedSingleObserver.onSuccess(RxInstrumentedSingleObserver.java:65)
    at io.reactivex.internal.operators.single.SingleFlatMap$SingleFlatMapCallback$FlatMapSingleObserver.onSuccess(SingleFlatMap.java:111)
    at io.micronaut.reactive.rxjava2.RxInstrumentedSingleObserver.onSuccess(RxInstrumentedSingleObserver.java:65)
    at io.reactivex.internal.operators.single.SingleDoFinally$DoFinallyObserver.onSuccess(SingleDoFinally.java:73)
    at io.micronaut.reactive.rxjava2.RxInstrumentedSingleObserver.onSuccess(RxInstrumentedSingleObserver.java:65)
    at io.reactivex.internal.operators.single.SingleCreate$Emitter.onSuccess(SingleCreate.java:67)
    at io.micronaut.rabbitmq.reactive.RxJavaReactivePublisher$3.handleDelivery(RxJavaReactivePublisher.java:324)
    at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149)
    at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:104)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
    at java.base/java.lang.Thread.run(Thread.java:832)
 

听众

@RabbitListener
public class ProductListener {
 @Queue(ProductTopicConstants.GET_PRODUCTS)
    public List<ProductViewModel> find(String text) {
        List<ProductViewModel> value = new ArrayList<>();
        value.add(new ProductViewModel("111-ffff-111","name",100,"description"));
        value.add(new ProductViewModel("111-ffff-111","name",100,"description"));
        value.add(new ProductViewModel("111-ffff-111","name",100,"description"));
        value.add(new ProductViewModel("111-ffff-111","name",100,"description"));
        value.add(new ProductViewModel("111-ffff-111","name",100,"description"));
        value.add(new ProductViewModel("111-ffff-111","name",100,"description"));
        value.add(new ProductViewModel("111-ffff-111","name",100,"description"));
        value.add(new ProductViewModel("111-ffff-111","name",100,"description"));
        return value;
    }
}

制片人

@RabbitClient(ProductTopicConstants.FETE_BIRD_EXCHANGE)
@RabbitProperty(name = "replyTo", value = "amq.rabbitmq.reply-to")
public interface IProductProducer {
    @Binding(ProductTopicConstants.GET_PRODUCTS)
    Flowable<ProductViewModel> find(String text);
}

控制器

@Get("/{text}")
    public Flowable<ProductViewModel> Find(String text) {
        return iproductProducer.find(text);
    }

对于单个项目列表,不会出现异常,但是,对于超过 1 个值,会出现异常。由于我使用的是Flowable它应该像一个LIST

4

0 回答 0