0

我们正在尝试将断路器 (resilience4j) 与重新交付政策相结合(参见下面的路线)

<?xml version="1.0" encoding="UTF-8"?>
<route xmlns="http://camel.apache.org/schema/spring" id="asyncOneConsumer.out" streamCache="true">
   <from uri="kafka:asyncOneConsumer?brokers=localhost:9092&amp;headerDeserializer=#stringKafkaHeaderDeserializer" />
   <onException useOriginalMessage="true">
      <exception>com.myCompany.exception.RetryException</exception>
      <redeliveryPolicy delayPattern="0:100;1:200;2:300;4:400;5:600" logExhausted="false" logExhaustedMessageHistory="false" logStackTrace="false" maximumRedeliveries="5" retryAttemptedLogLevel="DEBUG" />
      <to uri="direct:asyncOneConsumer.dlq.error.out" />
   </onException>
   <onException>
      <exception>java.lang.Exception</exception>
      <redeliveryPolicy logExhausted="false" logExhaustedMessageHistory="false" logStackTrace="false" />
      <bean />
   </onException>
   <log loggingLevel="DEBUG" message="Consumed: ${body} for route asyncOneConsumer.out" />
   <process />
   <setHeader name="LOGGING_TYPE">
      <constant>OUTBOUND_REQUEST</constant>
   </setHeader>
   <process />
   <process />
   <removeHeaders pattern="CamelHttp*" />
   <removeHeader headerName="CamelServletContextPath" />
   <setHeader name="CamelHttpMethod">
      <expressionDefinition>org.apache.camel.model.ProcessorDefinition$2@148b2e07</expressionDefinition>
   </setHeader>
   <setHeader name="Content-Type">
      <expressionDefinition>org.apache.camel.model.ProcessorDefinition$2@75125d76</expressionDefinition>
   </setHeader>
   <process />
   <process />
   <circuitBreaker inheritErrorHandler="true">
      <resilience4jConfiguration failureRateThreshold="50.0" minimumNumberOfCalls="3" permittedNumberOfCallsInHalfOpenState="10" slidingWindowSize="10" slowCallDurationThreshold="60" slowCallRateThreshold="100.0" waitDurationInOpenState="10">
         <timeoutEnabled>true</timeoutEnabled>
         <timeoutDuration>30000</timeoutDuration>
      </resilience4jConfiguration>
      <process />
      <setHeader name="LOGGING_TYPE">
         <constant>OUTBOUND_RESPONSE</constant>
      </setHeader>
      <process />
   </circuitBreaker>
</route>

我们注意到,如果在 CB 打开时交换处于重试过程中,则会继续调用 onRedelivery 处理器(请参阅日志)。但是,不会调用实际执行出站调用的处理器。

[ead #1 - KafkaConsumer[asyncOneConsumer]] | LoggingProcessor| [0000000007] | [RETRY #1 of 5 due to cause:  Error is retryable. HTTP_STATUS_CODE = null. ] 
[ead #1 - KafkaConsumer[asyncOneConsumer]] | LoggingProcessor| [0000000007] | [RETRY #2 of 5 due to cause:  Error is retryable. HTTP_STATUS_CODE = null. ] 
[ead #1 - KafkaConsumer[asyncOneConsumer]] | LoggingProcessor| [0000000007] | [RETRY #3 of 5 due to cause:  Error is retryable. HTTP_STATUS_CODE = null. ] 
[ead #1 - KafkaConsumer[asyncOneConsumer]] | LoggingProcessor| [0000000007] | [RETRY #4 of 5 due to cause:  CircuitBreaker 'circuitBreaker1' is OPEN and does not permit further calls]
[ead #1 - KafkaConsumer[asyncOneConsumer]] | LoggingProcessor| [0000000007] | [RETRY #5 of 5 due to cause:  CircuitBreaker 'circuitBreaker1' is OPEN and does not permit further calls] 

这真的很令人困惑,尤其是当我们增加 kafka 消费者时。这是这种方式吗?可以用其他方式处理吗?我们尝试了“retryUntil”,但这仅在合法重试期间调用(具有 RetryException 的那些 - 不是具有 CB 异常的那些)。

4

0 回答 0