我正在使用 Quarkus v2.3.0 和 Kafka 扩展进行响应式消息传递。我复制了这篇博文中给出的忽略失败策略示例https://quarkus.io/blog/kafka-failure-strategy/#the-ignore-strategy
我尝试使用 Message<> 包装器签名将其应用于前导方法(使用 @Incoming 和 @Outoging)。但是当抛出异常时,我得到以下错误并且流肯定停止,这不是忽略失败策略的预期行为:
[Exception 0] java.lang.IllegalArgumentException: I don't like movie with ' in their title: io.smallrye.reactive.messaging.kafka.IncomingKafkaRecord@4e76ab1c
[Exception 1] io.smallrye.reactive.messaging.ProcessingException: SRMSG00103: Exception thrown when calling the method org.acme.MovieProcessor#consume
at io.smallrye.mutiny.operators.uni.UniOnItemOrFailureFlatMap$UniOnItemOrFailureFlatMapProcessor.performInnerSubscription(UniOnItemOrFailureFlatMap.java:89)
at io.smallrye.mutiny.operators.uni.UniOnItemOrFailureFlatMap$UniOnItemOrFailureFlatMapProcessor.onFailure(UniOnItemOrFailureFlatMap.java:65)
at io.smallrye.mutiny.operators.uni.UniOperatorProcessor.onFailure(UniOperatorProcessor.java:44)
at io.smallrye.mutiny.operators.uni.UniOnItemTransform$UniOnItemTransformProcessor.onItem(UniOnItemTransform.java:40)
at io.smallrye.mutiny.operators.uni.builders.UniCreateFromCompletionStage$CompletionStageUniSubscription.forwardResult(UniCreateFromCompletionStage.java:63)
...
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:834)
Suppressed: io.smallrye.reactive.messaging.ProcessingException: SRMSG00103: Exception thrown when calling the method org.acme.MovieProcessor#consume
at io.smallrye.reactive.messaging.ProcessorMediator.handlePostInvocationWithMessage(ProcessorMediator.java:339)
at io.smallrye.context.impl.wrappers.SlowContextualBiFunction.apply(SlowContextualBiFunction.java:21)
at io.smallrye.mutiny.operators.uni.UniOnItemOrFailureFlatMap$UniOnItemOrFailureFlatMapProcessor.performInnerSubscription(UniOnItemOrFailureFlatMap.java:86)
... 91 more
Caused by: java.lang.IllegalArgumentException: I don't like movie with ' in their title: io.smallrye.reactive.messaging.kafka.IncomingKafkaRecord@4e76ab1c
at org.acme.MovieProcessor.consume(MovieProcessor.java:46)
at org.acme.MovieProcessor_Subclass.consume$$superforward1(MovieProcessor_Subclass.zig:180)
at org.acme.MovieProcessor_Subclass$$function$$3.apply(MovieProcessor_Subclass$$function$$3.zig:33)
at io.quarkus.arc.impl.AroundInvokeInvocationContext.proceed(AroundInvokeInvocationContext.java:54)
at io.quarkus.arc.runtime.devconsole.InvocationInterceptor.proceed(InvocationInterceptor.java:62)
at io.quarkus.arc.runtime.devconsole.InvocationInterceptor.monitor(InvocationInterceptor.java:49)
at io.quarkus.arc.runtime.devconsole.InvocationInterceptor_Bean.intercept(InvocationInterceptor_Bean.zig:521)
at io.quarkus.arc.impl.InterceptorInvocation.invoke(InterceptorInvocation.java:41)
at io.quarkus.arc.impl.AroundInvokeInvocationContext.perform(AroundInvokeInvocationContext.java:41)
at io.quarkus.arc.impl.InvocationContexts.performAroundInvoke(InvocationContexts.java:32)
at org.acme.MovieProcessor_Subclass.consume(MovieProcessor_Subclass.zig:428)
at org.acme.MovieProcessor_ClientProxy.consume(MovieProcessor_ClientProxy.zig:188)
at org.acme.MovieProcessor_SmallRyeMessagingInvoker_consume_e1df8366ae8945053ea77ff7477c27c43c0fd3ba.invoke(MovieProcessor_SmallRyeMessagingInvoker_consume_e1df8366ae8945053ea77ff7477c27c43c0fd3ba.zig:48)
at io.smallrye.reactive.messaging.AbstractMediator.invoke(AbstractMediator.java:94)
at io.smallrye.reactive.messaging.ProcessorMediator.lambda$processMethodReturningIndividualMessageAndConsumingIndividualItem$11(ProcessorMediator.java:270)
at io.smallrye.context.impl.wrappers.SlowContextualFunction.apply(SlowContextualFunction.java:21)
at io.smallrye.mutiny.operators.uni.UniOnItemTransform$UniOnItemTransformProcessor.onItem(UniOnItemTransform.java:36)
... 88 more
[CIRCULAR REFERENCE:java.lang.IllegalArgumentException: I don't like movie with ' in their title: io.smallrye.reactive.messaging.kafka.IncomingKafkaRecord@4e76ab1c]
这是处理器方法:
@Incoming("movies")
@Outgoing("next")
public Message<String> consume(Message<String> movie) {
LOGGER.infof("Receiving movie %s", movie);
if (movie.getPayload().contains("'")) {
throw new IllegalArgumentException("I don't like movie with ' in their title: " + movie);
}
if (movie.getPayload().contains(",")) {
throw new IllegalArgumentException("I don't like movie with , in their title: " + movie);
}
return movie.withPayload(movie.getPayload());
}
相同的代码正在使用有效负载签名:
@Incoming("movies")
@Outgoing("next")
public String consume(String movie) {...}
记录了异常,但忽略了错误,并且电影主题的消费按预期继续。
我错过了什么 ?为什么不忽略异常?