1
<int:service-activator input-channel="toKafka"  ref="conditionalProducerService" method="producerCircuitBreaker">

  <int:request-handler-advice-chain>
       <ref bean="circuitBreakerAdvice1" />
   </int:request-handler-advice-chain>
            </int:service-activator>

  <int:channel id="failedChannel2" />
  <int-kafka:outbound-channel-adapter
                            id="kafkaOutboundChannelAdapter" kafka-producer-context-ref="producerContext" auto-startup="false" channel="toKafka" message-key="kafka_messageKey">
                            <int:poller fixed-delay="1000" error-channel="failedChannel2" />
            </int-kafka:outbound-channel-adapter>


      <int:chain input-channel="failedChannel2">
        <int:transformer expression="'failed:' + payload.failedMessage.payload + ' with ' + payload.cause.message" />
                            <int-stream:stderr-channel-adapter append-newline="true"/>
            </int:chain>

            <bean id="circuitBreakerAdvice1" class="org.springframework.integration.handler.advice.RequestHandlerCircuitBreakerAdvice">
                            <property name="threshold" value="2" />                                         
                            <property name="halfOpenAfter" value="12000" />                     
            </bean>

  public Message<?> producerCircuitBreaker(Message<?> payload) {
          throw  new RuntimeException("foo Pro");}

通过上述配置,我们正在尝试:

1.期望将失败的消息传播到没有发生的错误通道 =“failedChannel2”。因为我在控制台中看不到转换后的输出。

2.CircuitBreaker 正在为 ServiceActivator 工作(对于应用程序相关的异常,如上所述),但是我们如何为出站适配器的失败案例配置 CB。例如:当连接超时或服务器突然关闭/网络连接问题/在将消息从 SI 通道发送到外部(kafka)服务器之前出现一些环境问题。我们可以为这种情况配置带有出站适配器的 CB。

根据关于断路器建议的 SI 文档,如下所示。

“通常,此建议可能用于外部服务,可能需要一些时间才能失败(例如尝试建立网络连接的超时)”。

请就如何实现这一点提出建议。非常感谢。

更新配置:

        <int:gateway default-request-channel="toKafka" error-channel="errorChannel"
default-reply-timeout="0" />

 <int:service-activator input-channel="toKafka">
<bean class="com.XXX.ProducerMessageHandler" >
 <constructor-arg ref="producerContext"/>
</bean>
     <int:request-handler-advice-chain>
                                     <ref bean="circuitBreakerAdvice" />
                       </int:request-handler-advice-chain>

<bean id="transformerService1" class="com.XXX.KafkaTransformerTest" />

 <int:transformer input-channel="errorChannel"
                              order="1" ref="transformerService1" method="transformFailed">

                       </int:transformer>  

 public void transformFailed(Message<?> message) {
          APPLOGGER.log("transformer message test" + message);


 public class ProducerMessageHandler extends KafkaProducerMessageHandler{

            public ProducerMessageHandler(KafkaProducerContext kafkaProducerContext) {
                            super(kafkaProducerContext);
                            // TODO Auto-generated constructor stub
            }

            @Override
            public void handleMessageInternal(final Message<?> message) throws Exception {

                            //super.handleMessageInternal(message);
                            throw new RuntimeException("test foo");
            }

日志 :

01-05@23:44:18,598 调试 org.springframework.integration.config.ServiceActivatorFactoryBean$1 - org.springframework.integration.config.ServiceActivatorFactoryBean$1@6a0ef4b6 收到消息:GenericMessage [payload=hello, headers={timestamp=1452017658598, id =e0591162-3b93-9bb6-0699-89b15b20e904}] 调试:-com.XXX.ProducerMessageHandler#0 收到的消息:GenericMessage [payload=hello, headers={timestamp=1452017658598, id=e0591162-3b93-9bb6-0699-89b15b20e904} ] 出现异常:org.springframework.messaging.MessageHandlingException:消息处理程序 [com.XXX.ProducerMessageHandler#0] 发生错误;嵌套异常是 java.lang.RuntimeException: test foo 01-05@23:44:18,606 DEBUG org.springframework.integration.channel.PublishSubscribeChannel - preSend on channel 'toKafka', message: GenericMessage [payload=hello, headers={timestamp=1452017658605, id=61597941-b2f8-314d-141d-8f2c058dda4d}] 01-05@23:44:18,606 调试 org.springframework.integration.config.ServiceActivatorFactoryBean$1 - org.springframework.integration.config.ServiceActivatorFactoryBean $1@6a0ef4b6 收到消息:GenericMessage [payload=hello, headers={timestamp=1452017658605, id=61597941-b2f8-314d-141d-8f2c058dda4d}] 调试:-com.XXX.ProducerMessageHandler#0 收到消息:GenericMessage [payload=hello , headers={timestamp=1452017658605, id=61597941-b2f8-314d-141d-8f2c058dda4d}] 出现异常:org.springframework.messaging.MessageHandlingException: 消息处理程序 [com.XXX.ProducerMessageHandler#0] 发生错误;嵌套异常是 java.lang.RuntimeException: test foo 01-05@23:44:18,606 DEBUG org.springframework.integration.channel。PublishSubscribeChannel - 在通道“toKafka”上预发送,消息:GenericMessage [payload=hello, headers={timestamp=1452017658606, id=119afbf1-6104-feb1-eb44-f646aa932277}] 01-05@23:44:18,606 DEBUG org.springframework .integration.config.ServiceActivatorFactoryBean$1 - org.springframework.integration.config.ServiceActivatorFactoryBean$1@6a0ef4b6 收到消息:GenericMessage [payload=hello, headers={timestamp=1452017658606, id=119afbf1-6104-feb1-eb44-f646aa932277}] 得到异常:org.springframework.messaging.MessageHandlingException:消息处理程序 [org.springframework.integration.config.ServiceActivatorFactoryBean$1@6a0ef4b6] 中发生错误;嵌套异常是 org.springframework.integration.handler.advice.RequestHandlerCircuitBreakerAdvice$CircuitBreakerOpenException: Circuit Breaker is Open for org.springframework。integration.config.ServiceActivatorFactoryBean$1@6a0ef4b6 01-05@23:44:18,606 调试 org.springframework.integration.channel.PublishSubscribeChannel - 在通道“toKafka”上预发送,消息:GenericMessage [payload=hello,headers={timestamp=1452017658606, id=8dafe2e0-8efe-c827-e745-1387e6045e7d}] 01-05@23:44:18,606 调试 org.springframework.integration.config.ServiceActivatorFactoryBean$1 - org.springframework.integration.config.ServiceActivatorFactoryBean$1@6a0ef4b6 收到消息:GenericMessage [payload=hello, headers={timestamp=1452017658606, id=8dafe2e0-8efe-c827-e745-1387e6045e7d}] 出现异常:org.springframework.messaging.MessageHandlingException:消息处理程序 [org.springframework.integration.config. ServiceActivatorFactoryBean$1@6a0ef4b6]; 嵌套异常是 org.springframework.integration。

4

2 回答 2

1

该建议仅适用于分配给它的端点,不适用于下游流;不幸的是,kafka 模式不允许将其应用于出站通道适配器。我为此创建了一个JIRA 问题

一种解决方法是将 a 配置KafkaProducerMessageHandler为 a<bean/>ref从 a配置它<service-activator/>。然后你可以应用你的断路器。

另一种解决方法是使用流入网关......

<int:service-activator ... ref="gw">
    <int:request-handler-advice-chain ...

</int:service-activator>

<int:gateway id="gw" default-request-channel="toKafka" 
         default-reply-timeout="0"
         error-channel="..." ... />

我不确定您为什么在错误频道中没有看到消息;通常,打开 DEBUG 日志记录将有助于调试这种事情。

编辑

我刚刚用这个进行了测试,它工作得很好......

<int:gateway default-request-channel="toKafka" error-channel="errorChannel"
    default-reply-timeout="0" />

<int:service-activator input-channel="toKafka">
    <bean class="com.example.Foo" />
    <int:request-handler-advice-chain>
        <bean class="org.springframework.integration.handler.advice.RequestHandlerCircuitBreakerAdvice">
            <property name="threshold" value="2"/>
        </bean>
    </int:request-handler-advice-chain>
</int:service-activator>

编辑2

如果您不使用网关,则可以使用队列通道和轮询器来处理它。这对我来说也很好......

<int:channel id="toKafka">
    <int:queue />
</int:channel>

<int:service-activator input-channel="toKafka">
    <bean class="com.example.Foo" />
    <int:poller error-channel="errorChannel" fixed-delay="1000" />
    <int:request-handler-advice-chain>
        <bean class="org.springframework.integration.handler.advice.RequestHandlerCircuitBreakerAdvice">
            <property name="threshold" value="2"/>
            <property name="halfOpenAfter" value="12000"/>
        </bean>
    </int:request-handler-advice-chain>
</int:service-activator>

或者,您可以添加一个中间流网关。

于 2015-12-19T19:29:07.383 回答
0

从聊天中复制以供将来参考

山姆:嗨,加里

    for(int i=0;i<4;i++){ 
    try{ 
   toKafka.send(MessageBuilder 
   .withPayload("hello"). 
     build()); 
     }catch(Exception e){ 
     System.out.println("got exception : " + e); } }

这就是我发送消息的方式

Gary:所以你直接发送到频道 - 你应该使用 MessagingGateway 代替。 山姆:嗨,加里。谢谢。它正在与网关合作。

使用KafkaProducerMessageHandler配置 CB很好,但它涵盖了以下方法下的任何故障

公共无效句柄消息内部(最终消息消息)抛出异常

但我想涵盖网络错误的问题以及它没有涵盖的无效代理列表/服务器,我在控制台中遇到这样的异常:

日志

   12-24@16:46:46,250 DEBUGspringframework.integration.kafka.outbound.KafkaProducerMessageHandler - org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler#0 received message: GenericMessage [payload=TestVo[data=sample message]], headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@44286963, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@44286963, kafka_topic=tried_in, kafka_partitionId=2, id=7b596368-0aee-ddaa-2168-dc403e22c38f, timestamp=1450955805294}] 
   12-24@16:55:12,630 WARN apache.kafka.common.network.Selector - Error in I/O with /1.2.0.3 
   java.net.ConnectException: Connection refused: no further information 
   at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) 
   at sun.nio.ch.SocketChannelImpl.finishConnect(Unknown Source) 
   at org.apache.kafka.common.network.Selector.poll(Selector.java:238) 
   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192) 
   at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191) 
   at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122) 
   at java.lang.Thread.run(Unknown Source)

在这种情况下也希望CB被调用。

Gary:连接异常应该发生在handleMessageInternal() 中如果没有抛出异常,那就是一个bug。我会看看。

未来在 handleMessageInternal中被丢弃 - 我将打开一个 JIRA 问题。

https://jira.spring.io/browse/INTEXT-218

山姆:好的。当kafka服务器由于某种原因而关闭时,它会覆盖这种情况吗?

加里:是的;但您可能希望减少默认超时(60 秒)

于 2016-01-11T21:04:30.420 回答