1

我正在使用 Quarkus v2.3.0 和 Kafka 扩展进行响应式消息传递。我复制了这篇博文中给出的忽略失败策略示例https://quarkus.io/blog/kafka-failure-strategy/#the-ignore-strategy

我尝试使用 Message<> 包装器签名将其应用于前导方法(使用 @Incoming 和 @Outoging)。但是当抛出异常时,我得到以下错误并且流肯定停止,这不是忽略失败策略的预期行为:

    [Exception 0] java.lang.IllegalArgumentException: I don't like movie with ' in their title: io.smallrye.reactive.messaging.kafka.IncomingKafkaRecord@4e76ab1c
    [Exception 1] io.smallrye.reactive.messaging.ProcessingException: SRMSG00103: Exception thrown when calling the method org.acme.MovieProcessor#consume
    at io.smallrye.mutiny.operators.uni.UniOnItemOrFailureFlatMap$UniOnItemOrFailureFlatMapProcessor.performInnerSubscription(UniOnItemOrFailureFlatMap.java:89)
    at io.smallrye.mutiny.operators.uni.UniOnItemOrFailureFlatMap$UniOnItemOrFailureFlatMapProcessor.onFailure(UniOnItemOrFailureFlatMap.java:65)
    at io.smallrye.mutiny.operators.uni.UniOperatorProcessor.onFailure(UniOperatorProcessor.java:44)
    at io.smallrye.mutiny.operators.uni.UniOnItemTransform$UniOnItemTransformProcessor.onItem(UniOnItemTransform.java:40)
    at io.smallrye.mutiny.operators.uni.builders.UniCreateFromCompletionStage$CompletionStageUniSubscription.forwardResult(UniCreateFromCompletionStage.java:63)
    ...
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:834)
    Suppressed: io.smallrye.reactive.messaging.ProcessingException: SRMSG00103: Exception thrown when calling the method org.acme.MovieProcessor#consume
        at io.smallrye.reactive.messaging.ProcessorMediator.handlePostInvocationWithMessage(ProcessorMediator.java:339)
        at io.smallrye.context.impl.wrappers.SlowContextualBiFunction.apply(SlowContextualBiFunction.java:21)
        at io.smallrye.mutiny.operators.uni.UniOnItemOrFailureFlatMap$UniOnItemOrFailureFlatMapProcessor.performInnerSubscription(UniOnItemOrFailureFlatMap.java:86)
        ... 91 more
    Caused by: java.lang.IllegalArgumentException: I don't like movie with ' in their title: io.smallrye.reactive.messaging.kafka.IncomingKafkaRecord@4e76ab1c
        at org.acme.MovieProcessor.consume(MovieProcessor.java:46)
        at org.acme.MovieProcessor_Subclass.consume$$superforward1(MovieProcessor_Subclass.zig:180)
        at org.acme.MovieProcessor_Subclass$$function$$3.apply(MovieProcessor_Subclass$$function$$3.zig:33)
        at io.quarkus.arc.impl.AroundInvokeInvocationContext.proceed(AroundInvokeInvocationContext.java:54)
        at io.quarkus.arc.runtime.devconsole.InvocationInterceptor.proceed(InvocationInterceptor.java:62)
        at io.quarkus.arc.runtime.devconsole.InvocationInterceptor.monitor(InvocationInterceptor.java:49)
        at io.quarkus.arc.runtime.devconsole.InvocationInterceptor_Bean.intercept(InvocationInterceptor_Bean.zig:521)
        at io.quarkus.arc.impl.InterceptorInvocation.invoke(InterceptorInvocation.java:41)
        at io.quarkus.arc.impl.AroundInvokeInvocationContext.perform(AroundInvokeInvocationContext.java:41)
        at io.quarkus.arc.impl.InvocationContexts.performAroundInvoke(InvocationContexts.java:32)
        at org.acme.MovieProcessor_Subclass.consume(MovieProcessor_Subclass.zig:428)
        at org.acme.MovieProcessor_ClientProxy.consume(MovieProcessor_ClientProxy.zig:188)
        at org.acme.MovieProcessor_SmallRyeMessagingInvoker_consume_e1df8366ae8945053ea77ff7477c27c43c0fd3ba.invoke(MovieProcessor_SmallRyeMessagingInvoker_consume_e1df8366ae8945053ea77ff7477c27c43c0fd3ba.zig:48)
        at io.smallrye.reactive.messaging.AbstractMediator.invoke(AbstractMediator.java:94)
        at io.smallrye.reactive.messaging.ProcessorMediator.lambda$processMethodReturningIndividualMessageAndConsumingIndividualItem$11(ProcessorMediator.java:270)
        at io.smallrye.context.impl.wrappers.SlowContextualFunction.apply(SlowContextualFunction.java:21)
        at io.smallrye.mutiny.operators.uni.UniOnItemTransform$UniOnItemTransformProcessor.onItem(UniOnItemTransform.java:36)
        ... 88 more
    [CIRCULAR REFERENCE:java.lang.IllegalArgumentException: I don't like movie with ' in their title: io.smallrye.reactive.messaging.kafka.IncomingKafkaRecord@4e76ab1c]

这是处理器方法:

    @Incoming("movies")
    @Outgoing("next")
    public Message<String> consume(Message<String> movie) {

        LOGGER.infof("Receiving movie %s", movie);
        if (movie.getPayload().contains("'")) {
            throw new IllegalArgumentException("I don't like movie with ' in their title: " + movie);
        }
        if (movie.getPayload().contains(",")) {
            throw new IllegalArgumentException("I don't like movie with , in their title: " + movie);
        }
        return movie.withPayload(movie.getPayload());
    }

相同的代码正在使用有效负载签名:

@Incoming("movies")
@Outgoing("next")
public String consume(String movie) {...}

记录了异常,但忽略了错误,并且电影主题的消费按预期继续。

我错过了什么 ?为什么不忽略异常?

4

0 回答 0