2

我正在尝试将 Spring Cloud Stream 与 Spring Cloud 功能 webflux 集成

因为他们在未来的版本中弃用 Spring Cloud 反应流,所以我正在尝试使用 Spring Cloud 功能 https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/2.1.2.RELEASE/single/spring -cloud-stream.html#spring-cloud-stream-preface-notable-deprecations

Spring Cloud Web 函数可以使用文档中的路径公开其函数的端点

https://cloud.spring.io/spring-cloud-static/spring-cloud-function/1.0.0.RELEASE/single/spring-cloud-function.html

从云流中我可以看到源需要定义为供应商https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/2.1.2.RELEASE/single/spring-cloud-stream。 html#_spring_cloud_function

但我的用例是从响应式 http 端点获取 POST 数据并摄取到 kafka,有没有办法从 spring cloud function web 和 spring cloud stream 实现它?

来自带有弹簧云流的弹簧云功能的文档

@SpringBootApplication
@EnableBinding(Source.class)
public static class SourceFromSupplier {
    public static void main(String[] args) {
        SpringApplication.run(SourceFromSupplier.class, "--spring.cloud.stream.function.definition=date");
    }
    @Bean
    public Supplier<Date> date() {
        return () -> new Date(12345L);
    }
}

如果我运行这个,我可以看到日期每 1 秒插入一次 kafka,如果我调用 localhost:/8080/date 之类的供应商的 get 端点会导致日期响应,有没有办法从 post 注入 paylaod 到带有弹簧云功能的kafka?

4

1 回答 1

1

您的问题有助于发现一个问题,它与函数和流提供的自动配置之间的生命周期不一致有关。该问题以 Spring Cloud Functions 创建的休息点无法看到绑定的方式表现出来,因为它是更早创建的

所以我们很快就会解决这个问题。同时有一个解决方法需要您output从 ApplicationContext 访问通道(见下文):

@SpringBootApplication
@EnableBinding(Source.class)
public class SimpleFunctionRabbitDemoApplication {

  public static void main(String[] args) throws Exception {      
    SpringApplication.run(SimpleFunctionRabbitDemoApplication.class);
  }

  @Bean
  public Consumer<String> storeSync(ApplicationContext context) {
     return v -> {
        MessageChannel channel = context.getBean(Source.OUTPUT, MessageChannel.class);
        channel.send(MessageBuilder.withPayload(v).build());
     };
  }
}
于 2019-03-27T05:58:27.927 回答