我们正在尝试将断路器 (resilience4j) 与重新交付政策相结合(参见下面的路线)
<?xml version="1.0" encoding="UTF-8"?>
<route xmlns="http://camel.apache.org/schema/spring" id="asyncOneConsumer.out" streamCache="true">
<from uri="kafka:asyncOneConsumer?brokers=localhost:9092&headerDeserializer=#stringKafkaHeaderDeserializer" />
<onException useOriginalMessage="true">
<exception>com.myCompany.exception.RetryException</exception>
<redeliveryPolicy delayPattern="0:100;1:200;2:300;4:400;5:600" logExhausted="false" logExhaustedMessageHistory="false" logStackTrace="false" maximumRedeliveries="5" retryAttemptedLogLevel="DEBUG" />
<to uri="direct:asyncOneConsumer.dlq.error.out" />
</onException>
<onException>
<exception>java.lang.Exception</exception>
<redeliveryPolicy logExhausted="false" logExhaustedMessageHistory="false" logStackTrace="false" />
<bean />
</onException>
<log loggingLevel="DEBUG" message="Consumed: ${body} for route asyncOneConsumer.out" />
<process />
<setHeader name="LOGGING_TYPE">
<constant>OUTBOUND_REQUEST</constant>
</setHeader>
<process />
<process />
<removeHeaders pattern="CamelHttp*" />
<removeHeader headerName="CamelServletContextPath" />
<setHeader name="CamelHttpMethod">
<expressionDefinition>org.apache.camel.model.ProcessorDefinition$2@148b2e07</expressionDefinition>
</setHeader>
<setHeader name="Content-Type">
<expressionDefinition>org.apache.camel.model.ProcessorDefinition$2@75125d76</expressionDefinition>
</setHeader>
<process />
<process />
<circuitBreaker inheritErrorHandler="true">
<resilience4jConfiguration failureRateThreshold="50.0" minimumNumberOfCalls="3" permittedNumberOfCallsInHalfOpenState="10" slidingWindowSize="10" slowCallDurationThreshold="60" slowCallRateThreshold="100.0" waitDurationInOpenState="10">
<timeoutEnabled>true</timeoutEnabled>
<timeoutDuration>30000</timeoutDuration>
</resilience4jConfiguration>
<process />
<setHeader name="LOGGING_TYPE">
<constant>OUTBOUND_RESPONSE</constant>
</setHeader>
<process />
</circuitBreaker>
</route>
我们注意到,如果在 CB 打开时交换处于重试过程中,则会继续调用 onRedelivery 处理器(请参阅日志)。但是,不会调用实际执行出站调用的处理器。
[ead #1 - KafkaConsumer[asyncOneConsumer]] | LoggingProcessor| [0000000007] | [RETRY #1 of 5 due to cause: Error is retryable. HTTP_STATUS_CODE = null. ]
[ead #1 - KafkaConsumer[asyncOneConsumer]] | LoggingProcessor| [0000000007] | [RETRY #2 of 5 due to cause: Error is retryable. HTTP_STATUS_CODE = null. ]
[ead #1 - KafkaConsumer[asyncOneConsumer]] | LoggingProcessor| [0000000007] | [RETRY #3 of 5 due to cause: Error is retryable. HTTP_STATUS_CODE = null. ]
[ead #1 - KafkaConsumer[asyncOneConsumer]] | LoggingProcessor| [0000000007] | [RETRY #4 of 5 due to cause: CircuitBreaker 'circuitBreaker1' is OPEN and does not permit further calls]
[ead #1 - KafkaConsumer[asyncOneConsumer]] | LoggingProcessor| [0000000007] | [RETRY #5 of 5 due to cause: CircuitBreaker 'circuitBreaker1' is OPEN and does not permit further calls]
这真的很令人困惑,尤其是当我们增加 kafka 消费者时。这是这种方式吗?可以用其他方式处理吗?我们尝试了“retryUntil”,但这仅在合法重试期间调用(具有 RetryException 的那些 - 不是具有 CB 异常的那些)。