<int:service-activator input-channel="toKafka" ref="conditionalProducerService" method="producerCircuitBreaker">
<int:request-handler-advice-chain>
<ref bean="circuitBreakerAdvice1" />
</int:request-handler-advice-chain>
</int:service-activator>
<int:channel id="failedChannel2" />
<int-kafka:outbound-channel-adapter
id="kafkaOutboundChannelAdapter" kafka-producer-context-ref="producerContext" auto-startup="false" channel="toKafka" message-key="kafka_messageKey">
<int:poller fixed-delay="1000" error-channel="failedChannel2" />
</int-kafka:outbound-channel-adapter>
<int:chain input-channel="failedChannel2">
<int:transformer expression="'failed:' + payload.failedMessage.payload + ' with ' + payload.cause.message" />
<int-stream:stderr-channel-adapter append-newline="true"/>
</int:chain>
<bean id="circuitBreakerAdvice1" class="org.springframework.integration.handler.advice.RequestHandlerCircuitBreakerAdvice">
<property name="threshold" value="2" />
<property name="halfOpenAfter" value="12000" />
</bean>
public Message<?> producerCircuitBreaker(Message<?> payload) {
throw new RuntimeException("foo Pro");}
通过上述配置,我们正在尝试:
1.期望将失败的消息传播到没有发生的错误通道 =“failedChannel2”。因为我在控制台中看不到转换后的输出。
2.CircuitBreaker 正在为 ServiceActivator 工作(对于应用程序相关的异常,如上所述),但是我们如何为出站适配器的失败案例配置 CB。例如:当连接超时或服务器突然关闭/网络连接问题/在将消息从 SI 通道发送到外部(kafka)服务器之前出现一些环境问题。我们可以为这种情况配置带有出站适配器的 CB。
根据关于断路器建议的 SI 文档,如下所示。
“通常,此建议可能用于外部服务,可能需要一些时间才能失败(例如尝试建立网络连接的超时)”。
请就如何实现这一点提出建议。非常感谢。
更新配置:
<int:gateway default-request-channel="toKafka" error-channel="errorChannel"
default-reply-timeout="0" />
<int:service-activator input-channel="toKafka">
<bean class="com.XXX.ProducerMessageHandler" >
<constructor-arg ref="producerContext"/>
</bean>
<int:request-handler-advice-chain>
<ref bean="circuitBreakerAdvice" />
</int:request-handler-advice-chain>
<bean id="transformerService1" class="com.XXX.KafkaTransformerTest" />
<int:transformer input-channel="errorChannel"
order="1" ref="transformerService1" method="transformFailed">
</int:transformer>
public void transformFailed(Message<?> message) {
APPLOGGER.log("transformer message test" + message);
public class ProducerMessageHandler extends KafkaProducerMessageHandler{
public ProducerMessageHandler(KafkaProducerContext kafkaProducerContext) {
super(kafkaProducerContext);
// TODO Auto-generated constructor stub
}
@Override
public void handleMessageInternal(final Message<?> message) throws Exception {
//super.handleMessageInternal(message);
throw new RuntimeException("test foo");
}
日志 :
01-05@23:44:18,598 调试 org.springframework.integration.config.ServiceActivatorFactoryBean$1 - org.springframework.integration.config.ServiceActivatorFactoryBean$1@6a0ef4b6 收到消息:GenericMessage [payload=hello, headers={timestamp=1452017658598, id =e0591162-3b93-9bb6-0699-89b15b20e904}] 调试:-com.XXX.ProducerMessageHandler#0 收到的消息:GenericMessage [payload=hello, headers={timestamp=1452017658598, id=e0591162-3b93-9bb6-0699-89b15b20e904} ] 出现异常:org.springframework.messaging.MessageHandlingException:消息处理程序 [com.XXX.ProducerMessageHandler#0] 发生错误;嵌套异常是 java.lang.RuntimeException: test foo 01-05@23:44:18,606 DEBUG org.springframework.integration.channel.PublishSubscribeChannel - preSend on channel 'toKafka', message: GenericMessage [payload=hello, headers={timestamp=1452017658605, id=61597941-b2f8-314d-141d-8f2c058dda4d}] 01-05@23:44:18,606 调试 org.springframework.integration.config.ServiceActivatorFactoryBean$1 - org.springframework.integration.config.ServiceActivatorFactoryBean $1@6a0ef4b6 收到消息:GenericMessage [payload=hello, headers={timestamp=1452017658605, id=61597941-b2f8-314d-141d-8f2c058dda4d}] 调试:-com.XXX.ProducerMessageHandler#0 收到消息:GenericMessage [payload=hello , headers={timestamp=1452017658605, id=61597941-b2f8-314d-141d-8f2c058dda4d}] 出现异常:org.springframework.messaging.MessageHandlingException: 消息处理程序 [com.XXX.ProducerMessageHandler#0] 发生错误;嵌套异常是 java.lang.RuntimeException: test foo 01-05@23:44:18,606 DEBUG org.springframework.integration.channel。PublishSubscribeChannel - 在通道“toKafka”上预发送,消息:GenericMessage [payload=hello, headers={timestamp=1452017658606, id=119afbf1-6104-feb1-eb44-f646aa932277}] 01-05@23:44:18,606 DEBUG org.springframework .integration.config.ServiceActivatorFactoryBean$1 - org.springframework.integration.config.ServiceActivatorFactoryBean$1@6a0ef4b6 收到消息:GenericMessage [payload=hello, headers={timestamp=1452017658606, id=119afbf1-6104-feb1-eb44-f646aa932277}] 得到异常:org.springframework.messaging.MessageHandlingException:消息处理程序 [org.springframework.integration.config.ServiceActivatorFactoryBean$1@6a0ef4b6] 中发生错误;嵌套异常是 org.springframework.integration.handler.advice.RequestHandlerCircuitBreakerAdvice$CircuitBreakerOpenException: Circuit Breaker is Open for org.springframework。integration.config.ServiceActivatorFactoryBean$1@6a0ef4b6 01-05@23:44:18,606 调试 org.springframework.integration.channel.PublishSubscribeChannel - 在通道“toKafka”上预发送,消息:GenericMessage [payload=hello,headers={timestamp=1452017658606, id=8dafe2e0-8efe-c827-e745-1387e6045e7d}] 01-05@23:44:18,606 调试 org.springframework.integration.config.ServiceActivatorFactoryBean$1 - org.springframework.integration.config.ServiceActivatorFactoryBean$1@6a0ef4b6 收到消息:GenericMessage [payload=hello, headers={timestamp=1452017658606, id=8dafe2e0-8efe-c827-e745-1387e6045e7d}] 出现异常:org.springframework.messaging.MessageHandlingException:消息处理程序 [org.springframework.integration.config. ServiceActivatorFactoryBean$1@6a0ef4b6]; 嵌套异常是 org.springframework.integration。