使用具有直接消息交换机制的 Micronaut RabbitMQ,在将值从侦听器返回给生产者时,出现如下所述的 JSON 解析器错误
08:37:58.637 [main] INFO io.micronaut.runtime.Micronaut - Startup completed in 8231ms. Server Running: http://localhost:8080
08:38:05.170 [pool-2-thread-4] ERROR i.m.h.n.stream.HttpStreamsHandler - Error occurred writing stream response: Error decoding JSON stream for type [T]: Unexpected token (START_OBJECT), expected END_ARRAY: Attempted to unwrap 'com.example.domain.ProductViewModel' value from an array (with `DeserializationFeature.UNWRAP_SINGLE_VALUE_ARRAYS`) but it contains more than one value
at [Source: (byte[])"[{"id":"111-ffff-111","name":"name","price":100.0,"description":"description"},{"id":"111-ffff-111","name":"name","price":100.0,"description":"description"},{"id":"111-ffff-111","name":"name","price":100.0,"description":"description"},{"id":"111-ffff-111","name":"name","price":100.0,"description":"description"},{"id":"111-ffff-111","name":"name","price":100.0,"description":"description"},{"id":"111-ffff-111","name":"name","price":100.0,"description":"description"},{"id":"111-ffff-111","name":"na"[truncated 125 bytes]; line: 1, column: 80]
io.micronaut.core.serialize.exceptions.SerializationException: Error decoding JSON stream for type [T]: Unexpected token (START_OBJECT), expected END_ARRAY: Attempted to unwrap 'com.example.domain.ProductViewModel' value from an array (with `DeserializationFeature.UNWRAP_SINGLE_VALUE_ARRAYS`) but it contains more than one value
at [Source: (byte[])"[{"id":"111-ffff-111","name":"name","price":100.0,"description":"description"},{"id":"111-ffff-111","name":"name","price":100.0,"description":"description"},{"id":"111-ffff-111","name":"name","price":100.0,"description":"description"},{"id":"111-ffff-111","name":"name","price":100.0,"description":"description"},{"id":"111-ffff-111","name":"name","price":100.0,"description":"description"},{"id":"111-ffff-111","name":"name","price":100.0,"description":"description"},{"id":"111-ffff-111","name":"na"[truncated 125 bytes]; line: 1, column: 80]
at io.micronaut.rabbitmq.serdes.JsonRabbitMessageSerDes.deserialize(JsonRabbitMessageSerDes.java:74)
at io.micronaut.rabbitmq.intercept.RabbitMQIntroductionAdvice.deserialize(RabbitMQIntroductionAdvice.java:323)
at io.micronaut.rabbitmq.intercept.RabbitMQIntroductionAdvice.lambda$intercept$22(RabbitMQIntroductionAdvice.java:268)
at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.onNext(FlowableFlatMap.java:132)
at io.micronaut.reactive.rxjava2.RxInstrumentedSubscriber.onNext(RxInstrumentedSubscriber.java:59)
at io.reactivex.internal.operators.flowable.FlowableTimeoutTimed$TimeoutSubscriber.onNext(FlowableTimeoutTimed.java:101)
at io.micronaut.reactive.rxjava2.RxInstrumentedSubscriber.onNext(RxInstrumentedSubscriber.java:59)
at io.reactivex.internal.subscriptions.DeferredScalarSubscription.complete(DeferredScalarSubscription.java:132)
at io.reactivex.internal.operators.single.SingleToFlowable$SingleToFlowableObserver.onSuccess(SingleToFlowable.java:62)
at io.micronaut.reactive.rxjava2.RxInstrumentedSingleObserver.onSuccess(RxInstrumentedSingleObserver.java:65)
at io.reactivex.internal.operators.single.SingleFlatMap$SingleFlatMapCallback$FlatMapSingleObserver.onSuccess(SingleFlatMap.java:111)
at io.micronaut.reactive.rxjava2.RxInstrumentedSingleObserver.onSuccess(RxInstrumentedSingleObserver.java:65)
at io.reactivex.internal.operators.single.SingleDoFinally$DoFinallyObserver.onSuccess(SingleDoFinally.java:73)
at io.micronaut.reactive.rxjava2.RxInstrumentedSingleObserver.onSuccess(RxInstrumentedSingleObserver.java:65)
at io.reactivex.internal.operators.single.SingleCreate$Emitter.onSuccess(SingleCreate.java:67)
at io.micronaut.rabbitmq.reactive.RxJavaReactivePublisher$3.handleDelivery(RxJavaReactivePublisher.java:324)
at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149)
at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:104)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
at java.base/java.lang.Thread.run(Thread.java:832)
听众
@RabbitListener
public class ProductListener {
@Queue(ProductTopicConstants.GET_PRODUCTS)
public List<ProductViewModel> find(String text) {
List<ProductViewModel> value = new ArrayList<>();
value.add(new ProductViewModel("111-ffff-111","name",100,"description"));
value.add(new ProductViewModel("111-ffff-111","name",100,"description"));
value.add(new ProductViewModel("111-ffff-111","name",100,"description"));
value.add(new ProductViewModel("111-ffff-111","name",100,"description"));
value.add(new ProductViewModel("111-ffff-111","name",100,"description"));
value.add(new ProductViewModel("111-ffff-111","name",100,"description"));
value.add(new ProductViewModel("111-ffff-111","name",100,"description"));
value.add(new ProductViewModel("111-ffff-111","name",100,"description"));
return value;
}
}
制片人
@RabbitClient(ProductTopicConstants.FETE_BIRD_EXCHANGE)
@RabbitProperty(name = "replyTo", value = "amq.rabbitmq.reply-to")
public interface IProductProducer {
@Binding(ProductTopicConstants.GET_PRODUCTS)
Flowable<ProductViewModel> find(String text);
}
控制器
@Get("/{text}")
public Flowable<ProductViewModel> Find(String text) {
return iproductProducer.find(text);
}
对于单个项目列表,不会出现异常,但是,对于超过 1 个值,会出现异常。由于我使用的是Flowable它应该像一个LIST