我正在使用 spring-cloud-starter-stream-kafka 使用 spring 云流。我已将我的频道绑定到 kafka 主题,如下所示application.properties
:
spring.cloud.stream.bindings.gatewayOutput.destination=received
spring.cloud.stream.bindings.enrichingInput.destination=received
spring.cloud.stream.bindings.enrichingOutput.destination=enriched
spring.cloud.stream.bindings.redeemingInput.destination=enriched
spring.cloud.stream.bindings.redeemingOutput.destination=redeemed
spring.cloud.stream.bindings.fulfillingInput.destination=redeemed
spring.cloud.stream.bindings.error.destination=errors12
spring.cloud.stream.bindings.errorInput.destination=errors12
spring.cloud.stream.bindings.errorOutput.destination=errors12
我无法让我的程序向错误通道生成异常消息。令人惊讶的是,即使我在不同的线程中,它似乎也没有尝试生成它(我有一个@MessagingGateway
将消息转储到gatewayOutput
中,然后其余的流程异步发生)。这是我的定义ServiceActivator
:
@Named
@Configuration
@EnableBinding(Channels.class)
@EnableIntegration
public class FulfillingServiceImpl extends AbstractBaseService implements
FulfillingService {
@Override
@Audit(value = "annotatedEvent")
@ServiceActivator(inputChannel = Channels.FULFILLING_INPUT, requiresReply = "false")
public void fulfill(TrivialRedemption redemption) throws Exception {
logger.error("FULFILLED!!!!!!");
throw new Exception("test exception");
}
}
这是生成的日志(我已经截断了完整的异常)。没有...
- 投诉 errorChannel 没有任何订阅者
- Kafka生产者线程日志记录
2016-05-13 12:13:14 pool-6-thread-1 DEBUG KafkaMessageChannelBinder$ReceivingHandler:115 - org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$ReceivingHandler@2b461688 收到消息:GenericMessage [payload=byte[ 400],标头={kafka_offset=17,kafka_messageKey=null,kafka_topic=已赎回,kafka_partitionId=0,kafka_nextOffset=18}] - {} 2016-05-13 12:13:14 pool-6-thread-1 DEBUG DirectChannel:430 - preSend on channel 'fulfillingInput', message: GenericMessage [payload=com.test.system.poc.model.v3.TrivialRedemption@2581ed90 [endpoints=[com.test.system.poc.model.v3.Breadcrumb@21be7df8],orderId=f72b2d9b-4e60-43fa-95d4-1b0b368fe49f,systemCategory=DEMO,systemSubCategory=,properties=,monetaryRedemptionAmount=456.78], headers= {kafka_offset=17, kafka_messageKey=null, kafka_topic=redeemed, kafka_partitionId=0, kafka_nextOffset=18, contentType=application/x-java-object;type=com.test.system.poc.model.v3.TrivialRedemption}] - { } 2016-05-13 12:13:14 pool-6-thread-1 DEBUG ServiceActivatingHandler:115 - ServiceActivator for [org.springframework.integration.handler.MethodInvokingMessageProcessor@64bce7ab] (fulfillingServiceImpl.fulfill.serviceActivator.handler) 收到消息:GenericMessage [payload=com.test.system.poc.model.v3.TrivialRedemption@2581ed90[endpoints=[com.test.system.poc.model.v3.Breadcrumb@21be7df8],orderId=f72b2d9b-4e60-43fa-95d4-1b0b368fe49f ,systemCategory=DEMO,systemSubCategory=,properties=,monetaryRedemptionAmount=456.78], headers={kafka_offset=17, kafka_messageKey=null, kafka_topic=redeemed, kafka_partitionId=0, kafka_nextOffset=18, contentType=application/x-java-object;type =com.test.system.poc.model.v3.TrivialRedemption}] - {} 2016-05-13 12:13:14 pool-6-thread-1 DEBUG DefaultListableBeanFactory:251 - 返回单例 bean 'integrationEvaluationContext' 的缓存实例 - {} 2016-05-13 12:13:14 pool-6-thread-1 DEBUG DefaultListableBeanFactory:251 - 返回单例 bean 'integrationConversionService' 的缓存实例 - {} 2016-05-13 12:13:14 pool-6-thread-1 ERROR FulfillingServiceImpl$$EnhancerBySpringCGLIB$$$9dad62:42 - 已完成!!!!!!- {} 2016-05-13 12:13:14 pool-6-thread-1 ERROR LoggingErrorHandler:35 - 处理时出错:KafkaMessage [Message(magic = 0, attributes = 0, crc = 3373691507, key = null, payload = java. nio.HeapByteBuffer[pos=0 lim=400 cap=400]), KafkaMessageMetadata [offset=17, nextOffset=18, Partition[topic='redeemed', id=0]] - {} ... ... 2016-05-13 12:13:14 kafka-fetch-1 TRACE BoundedByteBufferReceive:36 - 读取 40 个字节。- {} 2016-05-13 12:13:14 kafka-fetch-1 TRACE DefaultConnection:126 - 从分区读取 [topic='enriched', id=0]@18 - {} 2016-05-13 12:13:14 kafka-fetch-1 TRACE BoundedByteBufferSend:36 - 写入了 60 个字节。- {} 2016-05-13 12:13:14 kafka-fetch-1 TRACE BoundedByteBufferReceive:36 - 读取 40 个字节。- {} 2016-05-13 12:13:14 kafka-fetch-1 TRACE DefaultConnection:126 - 从分区读取 [topic='redeemed', id=0]@18 - {} 2016-05-13 12:13:14 kafka-fetch-1 TRACE BoundedByteBufferSend:36 - 写入了 60 个字节。- {} 2016-05-13 12:13:15 kafka-fetch-1 TRACE BoundedByteBufferReceive:36 - 读取 40 个字节。- {} 2016-05-13 12:13:15 kafka-fetch-1 TRACE DefaultConnection:126 - 从分区读取 [topic='errors12', id=0]@0 - {} 2016-05-13 12:13:15 kafka-fetch-1 TRACE BoundedByteBufferSend:36 - 写入了 60 个字节。- {} 2016-05-13 12:13:15 kafka-fetch-1 TRACE BoundedByteBufferReceive:36 - 读取 40 个字节。- {} 2016-05-13 12:13:15 kafka-fetch-1 TRACE BoundedByteBufferSend:36 - 写入了 60 个字节。- {}
编辑:这是我的频道课程的内容:
public interface Channels {
public static final String GATEWAY_OUTPUT = "gatewayOutput";
public static final String ENRICHING_INPUT = "enrichingInput";
public static final String ENRICHING_OUTPUT = "enrichingOutput";
public static final String REDEEMING_INPUT = "redeemingInput";
public static final String REDEEMING_OUTPUT = "redeemingOutput";
public static final String FULFILLING_INPUT = "fulfillingInput";
public static final String FULFILLING_OUTPUT = "fulfillingOutput";
@Output(GATEWAY_OUTPUT)
MessageChannel gatewayOutput();
@Input(ENRICHING_INPUT)
MessageChannel enrichingInput();
@Output(ENRICHING_OUTPUT)
MessageChannel enrichingOutput();
@Input(REDEEMING_INPUT)
MessageChannel redeemingInput();
@Output(REDEEMING_OUTPUT)
MessageChannel redeemingOutput();
@Input(FULFILLING_INPUT)
MessageChannel fulfillingInput();
@Output(FULFILLING_OUTPUT)
MessageChannel fulfillingOutput();