0

这是这个问题的续集。我可以将“普通”Apache Kafka Binder 与功能模型一起使用吗?到目前为止,我使用基于注释的配置混合了这两种配置,以便在一个应用程序中spring-cloud-stream-binder-kafka进行简单的消费/生产和spring-cloud-stream-binder-kafka-streams高级流处理。

功能模型似乎仅由streams活页夹支持,如果我尝试混合使用这两种方法 - 基于注释的简单用法和功能的流,流绑定未注册。

spring.cloud:
        stream:
          function:
            definition: processStream
          bindings:
            processStream-in-0:
              destination:  my-topic
            simple-binding-in:
              destination: another-topic

public interface SimpleBinding {

    String INPUT = "simple-binding-in";

    @Input(INPUT)
    SubscribableChannel simpleIn();

}

@Component
public class SimpleListener {

    @StreamListener(SimpleBinding.INPUT)
    public void listen(@Payload SomeDto payload) {
    }
}

@Configuration
public class FunctionalStream {

    @Bean
    public Consumer<KStream<String>> processStream() {
        return eventStream -> eventStream.map()
    }
}

@EnableBinding(SimpleBinding.class)存在于配置类中。是否首选/支持按所述混合两者,或者我是否应该将其streams-binder用于简单的消息消费?

4

1 回答 1

0

对于 Kafka Binder,您可以而且绝对应该使用功能模型,而完全忘记 StreamListener。这样,它将与您的 KStream 功能模型保持一致。

spring.cloud:
        stream:
          function:
            definition: processStream
          bindings:
            processStream-in-0:
              destination:  my-topic
            listen-in-0:
              destination: another-topic

@Component
public class SimpleListener {

    @Bean
    public Consumer<SomeDto> listen() {
        return payload -> ...
    }
}

@Configuration
public class FunctionalStream {

    @Bean
    public Consumer<KStream<String>> processStream() {
        return eventStream -> eventStream.map()
    }
}
于 2019-10-16T10:25:02.243 回答