我遇到了一个令人生畏的 NPE,它使用 Quarkus 1.6.1 和 Smallrye 反应式消息传递框架来使用 Kafka 主题。有趣的是,它不会以较低的消息速率(约 100 条消息/秒)发生,它似乎发生在较高的消息速率 +300 条消息/秒时。测试环境是一个简单的 Quarkus Kafka 消息生产者和附加到同一主题并使用反应式注释(@Outgoing、@Incoming)的消费者。当我引导测试应用程序(mvnw compile quarkus:dev)时抛出错误。经过几次重试后,应用程序开始按预期工作。
请检查下面消耗 Kafka 记录的 bean。
@Incoming("targets")
@Outgoing("parameters-map")
public Map<String, Object> processTarget(KafkaRecord<String, String> target) {
return this.deserializer.deserializeTarget(target.getKey(), target.getPayload());
}
开放JDK 11
12:23:53 ERROR [io.sm.re.me.provider] (RxComputationThreadPool-1) SRMSG00201: Error caught during the stream processing: io.smallrye.mutiny.CompositeException: Multiple exceptions caught:
[Exception 0] java.lang.NullPointerException
[Exception 1] io.smallrye.reactive.messaging.ProcessingException: SRMSG00103: Exception thrown when calling the method xxx.was.ingestion.indexer.SolrSink#processTarget
at io.smallrye.mutiny.operators.UniOnItemOrFailureFlatMap.invokeAndSubstitute(UniOnItemOrFailureFlatMap.java:34)
at io.smallrye.mutiny.operators.UniOnItemOrFailureFlatMap$1.onFailure(UniOnItemOrFailureFlatMap.java:62)
at io.smallrye.mutiny.context.ContextPropagationUniInterceptor$1.lambda$onFailure$2(ContextPropagationUniInterceptor.java:40)
at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:217)
at io.smallrye.mutiny.context.ContextPropagationUniInterceptor$1.onFailure(ContextPropagationUniInterceptor.java:40)
at io.smallrye.mutiny.operators.UniSerializedSubscriber.onFailure(UniSerializedSubscriber.java:85)
at io.smallrye.mutiny.context.ContextPropagationUniInterceptor$1.lambda$onFailure$2(ContextPropagationUniInterceptor.java:40)
at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:217)
at io.smallrye.mutiny.context.ContextPropagationUniInterceptor$1.onFailure(ContextPropagationUniInterceptor.java:40)
at io.smallrye.mutiny.operators.UniSerializedSubscriber.onFailure(UniSerializedSubscriber.java:85)
at io.smallrye.mutiny.operators.UniOnItemMap$1.onItem(UniOnItemMap.java:35)
at io.smallrye.mutiny.context.ContextPropagationUniInterceptor$1.lambda$onItem$1(ContextPropagationUniInterceptor.java:35)
at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:217)
at io.smallrye.mutiny.context.ContextPropagationUniInterceptor$1.onItem(ContextPropagationUniInterceptor.java:35)
at io.smallrye.mutiny.operators.UniSerializedSubscriber.onItem(UniSerializedSubscriber.java:72)
at io.smallrye.mutiny.context.ContextPropagationUniInterceptor$1.lambda$onItem$1(ContextPropagationUniInterceptor.java:35)
at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:217)
at io.smallrye.mutiny.context.ContextPropagationUniInterceptor$1.onItem(ContextPropagationUniInterceptor.java:35)
at io.smallrye.mutiny.operators.UniSerializedSubscriber.onItem(UniSerializedSubscriber.java:72)
at io.smallrye.mutiny.operators.UniCreateFromCompletionStage.lambda$forwardFromCompletionStage$1(UniCreateFromCompletionStage.java:30)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
at java.base/java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:883)
at java.base/java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2251)
at java.base/java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:143)
at io.smallrye.mutiny.operators.UniCreateFromCompletionStage.forwardFromCompletionStage(UniCreateFromCompletionStage.java:22)
at io.smallrye.mutiny.operators.UniCreateFromCompletionStage.subscribing(UniCreateFromCompletionStage.java:50)
at io.smallrye.mutiny.operators.UniSerializedSubscriber.subscribe(UniSerializedSubscriber.java:43)
at io.smallrye.mutiny.operators.UniSerializedSubscriber.subscribe(UniSerializedSubscriber.java:38)
at io.smallrye.mutiny.groups.UniSubscribe.withSubscriber(UniSubscribe.java:49)
at io.smallrye.mutiny.context.ContextPropagationUniInterceptor$2.lambda$subscribing$0(ContextPropagationUniInterceptor.java:51)
at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:217)
at io.smallrye.mutiny.context.ContextPropagationUniInterceptor$2.subscribing(ContextPropagationUniInterceptor.java:51)
at io.smallrye.mutiny.operators.UniSerializedSubscriber.subscribe(UniSerializedSubscriber.java:43)
at io.smallrye.mutiny.operators.UniSerializedSubscriber.subscribe(UniSerializedSubscriber.java:38)
at io.smallrye.mutiny.groups.UniSubscribe.withSubscriber(UniSubscribe.java:49)
at io.smallrye.mutiny.operators.UniOnItemMap.subscribing(UniOnItemMap.java:19)
at io.smallrye.mutiny.operators.UniSerializedSubscriber.subscribe(UniSerializedSubscriber.java:43)
at io.smallrye.mutiny.operators.UniSerializedSubscriber.subscribe(UniSerializedSubscriber.java:38)
at io.smallrye.mutiny.groups.UniSubscribe.withSubscriber(UniSubscribe.java:49)
at io.smallrye.mutiny.context.ContextPropagationUniInterceptor$2.lambda$subscribing$0(ContextPropagationUniInterceptor.java:51)
at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:217)
at io.smallrye.mutiny.context.ContextPropagationUniInterceptor$2.subscribing(ContextPropagationUniInterceptor.java:51)
at io.smallrye.mutiny.operators.UniSerializedSubscriber.subscribe(UniSerializedSubscriber.java:43)
at io.smallrye.mutiny.operators.UniSerializedSubscriber.subscribe(UniSerializedSubscriber.java:38)
at io.smallrye.mutiny.groups.UniSubscribe.withSubscriber(UniSubscribe.java:49)
at io.smallrye.mutiny.operators.UniOnItemOrFailureFlatMap.subscribing(UniOnItemOrFailureFlatMap.java:48)
at io.smallrye.mutiny.operators.UniSerializedSubscriber.subscribe(UniSerializedSubscriber.java:43)
at io.smallrye.mutiny.operators.UniSerializedSubscriber.subscribe(UniSerializedSubscriber.java:38)
at io.smallrye.mutiny.groups.UniSubscribe.withSubscriber(UniSubscribe.java:49)
at io.smallrye.mutiny.context.ContextPropagationUniInterceptor$2.lambda$subscribing$0(ContextPropagationUniInterceptor.java:51)
at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:217)
at io.smallrye.mutiny.context.ContextPropagationUniInterceptor$2.subscribing(ContextPropagationUniInterceptor.java:51)
at io.smallrye.mutiny.operators.UniSerializedSubscriber.subscribe(UniSerializedSubscriber.java:43)
at io.smallrye.mutiny.operators.UniSerializedSubscriber.subscribe(UniSerializedSubscriber.java:38)
at io.smallrye.mutiny.groups.UniSubscribe.withSubscriber(UniSubscribe.java:49)
at io.smallrye.mutiny.operators.UniProduceMultiOnItem.subscribe(UniProduceMultiOnItem.java:36)
at io.smallrye.mutiny.operators.AbstractMulti.subscribe(AbstractMulti.java:43)
at io.smallrye.mutiny.groups.MultiSubscribe.withSubscriber(MultiSubscribe.java:48)
at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$2.lambda$subscribe$0(ContextPropagationMultiInterceptor.java:58)
at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:217)
at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$2.subscribe(ContextPropagationMultiInterceptor.java:58)
at io.smallrye.mutiny.groups.MultiCreate$1.subscribe(MultiCreate.java:159)
at io.smallrye.mutiny.groups.MultiSubscribe.withSubscriber(MultiSubscribe.java:48)
at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$2.lambda$subscribe$0(ContextPropagationMultiInterceptor.java:58)
at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:217)
at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$2.subscribe(ContextPropagationMultiInterceptor.java:58)
at io.smallrye.mutiny.operators.multi.MultiFlatMapOp$FlatMapMainSubscriber.onItem(MultiFlatMapOp.java:191)
at io.smallrye.mutiny.subscription.MultiSubscriber.onNext(MultiSubscriber.java:61)
at io.smallrye.mutiny.subscription.SerializedSubscriber.onItem(SerializedSubscriber.java:69)
at io.smallrye.mutiny.subscription.MultiSubscriber.onNext(MultiSubscriber.java:61)
at io.smallrye.mutiny.subscription.SafeSubscriber.onNext(SafeSubscriber.java:87)
at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$1.lambda$onNext$1(ContextPropagationMultiInterceptor.java:36)
at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:217)
at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$1.onNext(ContextPropagationMultiInterceptor.java:36)
at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$1.lambda$onNext$1(ContextPropagationMultiInterceptor.java:36)
at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:217)
at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$1.onNext(ContextPropagationMultiInterceptor.java:36)
at io.smallrye.mutiny.subscription.SafeSubscriber.onNext(SafeSubscriber.java:87)
at io.smallrye.mutiny.helpers.HalfSerializer.onNext(HalfSerializer.java:31)
at io.smallrye.mutiny.helpers.StrictMultiSubscriber.onItem(StrictMultiSubscriber.java:81)
at io.smallrye.mutiny.subscription.MultiSubscriber.onNext(MultiSubscriber.java:61)
at io.smallrye.mutiny.streams.utils.ConnectableProcessor.onNext(ConnectableProcessor.java:122)
at io.smallrye.mutiny.streams.utils.WrappedProcessor.onNext(WrappedProcessor.java:44)
at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$1.lambda$onNext$1(ContextPropagationMultiInterceptor.java:36)
at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:217)
at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$1.onNext(ContextPropagationMultiInterceptor.java:36)
at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$1.lambda$onNext$1(ContextPropagationMultiInterceptor.java:36)
at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:217)
at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$1.onNext(ContextPropagationMultiInterceptor.java:36)
at io.smallrye.mutiny.subscription.SafeSubscriber.onNext(SafeSubscriber.java:87)
at io.reactivex.internal.util.HalfSerializer.onNext(HalfSerializer.java:45)
at io.reactivex.internal.subscribers.StrictSubscriber.onNext(StrictSubscriber.java:97)
at io.reactivex.internal.operators.flowable.FlowableMap$MapSubscriber.onNext(FlowableMap.java:68)
at io.reactivex.internal.operators.flowable.FlowableOnBackpressureDrop$BackpressureDropSubscriber.onNext(FlowableOnBackpressureDrop.java:84)
at io.reactivex.internal.operators.flowable.FlowableInterval$IntervalSubscriber.run(FlowableInterval.java:93)
at io.reactivex.internal.schedulers.ScheduledDirectPeriodicTask.run(ScheduledDirectPeriodicTask.java:38)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)